diff --git a/rsocket/src/main/java/org/springframework/security/rsocket/core/PayloadInterceptorRSocket.java b/rsocket/src/main/java/org/springframework/security/rsocket/core/PayloadInterceptorRSocket.java index 0fe8fd002d..b2ce678c09 100644 --- a/rsocket/src/main/java/org/springframework/security/rsocket/core/PayloadInterceptorRSocket.java +++ b/rsocket/src/main/java/org/springframework/security/rsocket/core/PayloadInterceptorRSocket.java @@ -91,10 +91,10 @@ class PayloadInterceptorRSocket extends RSocketProxy { public Flux requestChannel(Publisher payloads) { return Flux.from(payloads).switchOnFirst((signal, innerFlux) -> { Payload firstPayload = signal.get(); - return intercept(PayloadExchangeType.REQUEST_CHANNEL, firstPayload).flatMapMany((context) -> innerFlux - .index().concatMap((tuple) -> justOrIntercept(tuple.getT1(), tuple.getT2())) - .transform((securedPayloads) -> this.source.requestChannel(securedPayloads)) - .subscriberContext(context)); + return intercept(PayloadExchangeType.REQUEST_CHANNEL, firstPayload).flatMapMany( + (context) -> innerFlux.index().concatMap((tuple) -> justOrIntercept(tuple.getT1(), tuple.getT2())) + .transform((securedPayloads) -> this.source.requestChannel(securedPayloads)) + .subscriberContext(context)); }); }