parent
63cd52d1a6
commit
1a28b4f849
|
@ -91,10 +91,10 @@ class PayloadInterceptorRSocket extends RSocketProxy {
|
||||||
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
|
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
|
||||||
return Flux.from(payloads).switchOnFirst((signal, innerFlux) -> {
|
return Flux.from(payloads).switchOnFirst((signal, innerFlux) -> {
|
||||||
Payload firstPayload = signal.get();
|
Payload firstPayload = signal.get();
|
||||||
return intercept(PayloadExchangeType.REQUEST_CHANNEL, firstPayload).flatMapMany((context) -> innerFlux
|
return intercept(PayloadExchangeType.REQUEST_CHANNEL, firstPayload).flatMapMany(
|
||||||
.index().concatMap((tuple) -> justOrIntercept(tuple.getT1(), tuple.getT2()))
|
(context) -> innerFlux.index().concatMap((tuple) -> justOrIntercept(tuple.getT1(), tuple.getT2()))
|
||||||
.transform((securedPayloads) -> this.source.requestChannel(securedPayloads))
|
.transform((securedPayloads) -> this.source.requestChannel(securedPayloads))
|
||||||
.subscriberContext(context));
|
.subscriberContext(context));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue