Skip to content

[rsocket-adapter-rxjs] RxRequestersFactory.requestChannel - Second send data always ignored by the rsocket server #234

Closed
@Florian935

Description

@Florian935

Expected Behavior

When I call:

requester
                    .route('request.channel')
                    .request(
                        RxRequestersFactory.requestChannel(
                            of('1', '2', '3', '4', '5'),
                            this._stringCodec,
                            this._stringCodec,
                            5
                        )
                    ).subscribe((response) => console.log(response));

I want to see in the console:

1
2
3
4
5

Actual Behavior

When I call:

requester
                    .route('request.channel')
                    .request(
                        RxRequestersFactory.requestChannel(
                            of('1', '2', '3', '4', '5'),
                            this._stringCodec,
                            this._stringCodec,
                            5
                        )
                    ).subscribe((response) => console.log(response));

My console print:

1
3
4
5

It seems that no matter what data stream you provide to the method, the second value is always ignored.

Steps to Reproduce

My RSocket server is written with Spring boot and my frontend is written in Angular You can retrieve my example by following this link: https://github.com/Florian935/rsocket-js-v1-alpha-client-server

client package Angular contains the rsocket-js demo with rxjs adapter and server package Spring Boot contains the rsocket server.
You can clone this repo and launch in local the client and server.
When front is live, open your browser console and navigate to "Request Channel" button. You will see in the console that only "1, 3, 4, 5" is printed. (the second value is always ignored).

Possible Solution

When I look at log of the rsocket server, I can see that the first call make a request(1) and then a request(4) (which is corresponding to the prefetch RxRequestersFactory.requestChannel() arguments ?).
We can see that only 1, 3, 4 and 5 are passed in the onNext() method.

Capture

I think that this first request(1) is the possible problem and it's perhaps related to this code:

const [firstValueObservable, restValuestObservable] = partition(
datas.pipe(
share({
connector: () => new Subject(),
resetOnRefCountZero: true,
})
),
(_value, index) => index === 0
);
return (
rsocket: RSocket,
metadata: Map<string | number | WellKnownMimeType, Buffer>
) =>
firstValueObservable.pipe(
take(1),
concatMap(
(firstValue) =>
new Observer2BufferingSubscriberToPublisher2PrefetchingObservable(
(
s: OnTerminalSubscriber &
OnNextSubscriber &
OnExtensionSubscriber &
Requestable &
Cancellable
) =>
rsocket.requestChannel(

Your Environment

Rsocket version used (Frontend: Angular 14):

"rsocket-adapter-rxjs": "^1.0.0-alpha.1",
"rsocket-composite-metadata": "^1.0.0-alpha.1",
"rsocket-core": "^1.0.0-alpha.1",
"rsocket-messaging": "^1.0.0-alpha.1",
"rsocket-tcp-client": "^1.0.0-alpha.1",
"rsocket-websocket-client": "^1.0.0-alpha.1",
"rxjs": "~7.5.0"

Backend Spring Boot 2.7.1:

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
```

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions