diff --git a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java index 3edaffee0ff..6f7a7546f4e 100644 --- a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java +++ b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java @@ -128,21 +128,21 @@ public class ClientHTTP3Session extends ClientProtocolSession } @Override - protected boolean onReadable(long readableStreamId) + protected void onReadable(long readableStreamId) { StreamType streamType = StreamType.from(readableStreamId); if (streamType == StreamType.CLIENT_BIDIRECTIONAL) { if (LOG.isDebugEnabled()) LOG.debug("bidirectional stream #{} selected for read", readableStreamId); - return super.onReadable(readableStreamId); + super.onReadable(readableStreamId); } else { QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureUnidirectionalStreamEndPoint); if (LOG.isDebugEnabled()) LOG.debug("unidirectional stream #{} selected for read: {}", readableStreamId, streamEndPoint); - return streamEndPoint.onReadable(); + streamEndPoint.onReadable(); } } diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java index 185ed3231b5..db32925ef0d 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java @@ -13,9 +13,7 @@ package org.eclipse.jetty.http3.server.internal; -import java.util.ArrayDeque; import java.util.Map; -import java.util.Queue; import java.util.concurrent.CompletableFuture; import org.eclipse.jetty.http3.HTTP3Configuration; @@ -36,9 +34,7 @@ import org.eclipse.jetty.quic.common.StreamType; import org.eclipse.jetty.quic.server.ServerProtocolSession; import org.eclipse.jetty.quic.server.ServerQuicSession; import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.Invocable; -import org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,8 +47,6 @@ public class ServerHTTP3Session extends ServerProtocolSession private final HTTP3SessionServer session; private final ControlFlusher controlFlusher; private final HTTP3Flusher messageFlusher; - private final AdaptiveExecutionStrategy strategy; - private final HTTP3Producer producer = new HTTP3Producer(); public ServerHTTP3Session(HTTP3Configuration configuration, ServerQuicSession quicSession, Session.Server.Listener listener) { @@ -89,9 +83,6 @@ public class ServerHTTP3Session extends ServerProtocolSession this.messageFlusher = new HTTP3Flusher(quicSession.getByteBufferPool(), encoder, configuration.getMaxResponseHeadersSize(), configuration.isUseOutputDirectByteBuffers()); addBean(messageFlusher); - - this.strategy = new AdaptiveExecutionStrategy(producer, getQuicSession().getExecutor()); - addBean(strategy); } public QpackDecoder getQpackDecoder() @@ -104,24 +95,6 @@ public class ServerHTTP3Session extends ServerProtocolSession return session; } - public void offer(Runnable task) - { - producer.offer(task); - } - - @Override - protected boolean processReadableStreams() - { - // Calling super.processReadableStreams() is going to fill and parse HEADERS frames on the current thread, - // so the QPACK decoder is not accessed concurrently. - // The processing of HEADERS frames will produce Runnable tasks and offer them to this instance (via calls - // to offer(Runnable)) so that the execution strategy can consume them. - - boolean result = super.processReadableStreams(); - strategy.produce(); - return result; - } - @Override protected void doStart() throws Exception { @@ -154,21 +127,21 @@ public class ServerHTTP3Session extends ServerProtocolSession } @Override - protected boolean onReadable(long readableStreamId) + protected void onReadable(long readableStreamId) { StreamType streamType = StreamType.from(readableStreamId); if (streamType == StreamType.CLIENT_BIDIRECTIONAL) { if (LOG.isDebugEnabled()) LOG.debug("bidirectional stream #{} selected for read", readableStreamId); - return super.onReadable(readableStreamId); + super.onReadable(readableStreamId); } else { QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureUnidirectionalStreamEndPoint); if (LOG.isDebugEnabled()) LOG.debug("unidirectional stream #{} selected for read: {}", readableStreamId, streamEndPoint); - return streamEndPoint.onReadable(); + streamEndPoint.onReadable(); } } @@ -227,25 +200,4 @@ public class ServerHTTP3Session extends ServerProtocolSession { session.onDataAvailable(streamId); } - - private class HTTP3Producer implements ExecutionStrategy.Producer - { - private final Queue tasks = new ArrayDeque<>(); - - public void offer(Runnable task) - { - if (LOG.isDebugEnabled()) - LOG.debug("enqueuing task {} on {}", task, ServerHTTP3Session.this); - tasks.offer(task); - } - - @Override - public Runnable produce() - { - Runnable task = tasks.poll(); - if (LOG.isDebugEnabled()) - LOG.debug("dequeued task {} on {}", task, ServerHTTP3Session.this); - return task; - } - } } diff --git a/jetty-quic/quic-client/src/main/java/org/eclipse/jetty/quic/client/ClientProtocolSession.java b/jetty-quic/quic-client/src/main/java/org/eclipse/jetty/quic/client/ClientProtocolSession.java index 68d7df34ec3..809d49dc42c 100644 --- a/jetty-quic/quic-client/src/main/java/org/eclipse/jetty/quic/client/ClientProtocolSession.java +++ b/jetty-quic/quic-client/src/main/java/org/eclipse/jetty/quic/client/ClientProtocolSession.java @@ -45,15 +45,14 @@ public class ClientProtocolSession extends ProtocolSession } @Override - protected boolean onReadable(long readableStreamId) + protected void onReadable(long readableStreamId) { // On the client, we need a get-only semantic in case of reads. QuicStreamEndPoint streamEndPoint = getStreamEndPoint(readableStreamId); if (LOG.isDebugEnabled()) LOG.debug("stream #{} selected for read: {}", readableStreamId, streamEndPoint); if (streamEndPoint != null) - return streamEndPoint.onReadable(); - return false; + streamEndPoint.onReadable(); } @Override diff --git a/jetty-quic/quic-client/src/main/java/org/eclipse/jetty/quic/client/ClientQuicConnection.java b/jetty-quic/quic-client/src/main/java/org/eclipse/jetty/quic/client/ClientQuicConnection.java index 89a697b791c..2840c3ab424 100644 --- a/jetty-quic/quic-client/src/main/java/org/eclipse/jetty/quic/client/ClientQuicConnection.java +++ b/jetty-quic/quic-client/src/main/java/org/eclipse/jetty/quic/client/ClientQuicConnection.java @@ -112,6 +112,14 @@ public class ClientQuicConnection extends QuicConnection } } + @Override + public void onFillable() + { + Runnable task = receiveAndProcess(); + if (task != null) + task.run(); + } + @Override protected QuicSession createSession(SocketAddress remoteAddress, ByteBuffer cipherBuffer) throws IOException { diff --git a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/ProtocolSession.java b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/ProtocolSession.java index c521f10b7c8..17bc66b46ae 100644 --- a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/ProtocolSession.java +++ b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/ProtocolSession.java @@ -13,15 +13,19 @@ package org.eclipse.jetty.quic.common; +import java.util.ArrayDeque; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.quic.common.internal.QuicErrorCode; import org.eclipse.jetty.util.component.ContainerLifeCycle; +import org.eclipse.jetty.util.thread.AutoLock; +import org.eclipse.jetty.util.thread.ExecutionStrategy; +import org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,12 +33,15 @@ public abstract class ProtocolSession extends ContainerLifeCycle { private static final Logger LOG = LoggerFactory.getLogger(ProtocolSession.class); - private final AtomicInteger active = new AtomicInteger(); + private final StreamsProducer producer = new StreamsProducer(); + private final AdaptiveExecutionStrategy strategy; private final QuicSession session; public ProtocolSession(QuicSession session) { this.session = session; + this.strategy = new AdaptiveExecutionStrategy(producer, session.getExecutor()); + addBean(strategy); } public QuicSession getQuicSession() @@ -44,36 +51,12 @@ public abstract class ProtocolSession extends ContainerLifeCycle public void process() { - // This method is called by the network thread and - // dispatches to one, per-session, processing thread. - - // The active counter counts up to 2, with the meanings: - // 0=idle, 1=process, 2=re-process, where re-process is - // necessary to close race between the processing thread - // seeing active=1 and about to exit, and the network - // thread also seeing active=1 and not dispatching, - // leaving unprocessed data in the session. - if (active.getAndUpdate(count -> count <= 1 ? count + 1 : count) == 0) - session.getExecutor().execute(this::processSession); + strategy.produce(); } - private void processSession() + public void offer(Runnable task) { - while (true) - { - processWritableStreams(); - if (processReadableStreams()) - continue; - - // Exit if did not process any stream and we are idle. - if (active.decrementAndGet() == 0) - { - CloseInfo closeInfo = session.getRemoteCloseInfo(); - if (closeInfo != null) - onClose(closeInfo.error(), closeInfo.reason()); - break; - } - } + producer.offer(task); } public QuicStreamEndPoint getStreamEndPoint(long streamId) @@ -104,18 +87,15 @@ public abstract class ProtocolSession extends ContainerLifeCycle streamEndPoint.onWritable(); } - protected boolean processReadableStreams() + protected void processReadableStreams() { List readableStreamIds = session.getReadableStreamIds(); if (LOG.isDebugEnabled()) LOG.debug("readable stream ids: {}", readableStreamIds); - - return readableStreamIds.stream() - .map(this::onReadable) - .reduce(false, (result, interested) -> result || interested); + readableStreamIds.forEach(this::onReadable); } - protected abstract boolean onReadable(long readableStreamId); + protected abstract void onReadable(long readableStreamId); public void configureProtocolEndPoint(QuicStreamEndPoint endPoint) { @@ -158,4 +138,54 @@ public abstract class ProtocolSession extends ContainerLifeCycle { public ProtocolSession newProtocolSession(QuicSession quicSession, Map context); } + + private class StreamsProducer implements ExecutionStrategy.Producer + { + private final AutoLock lock = new AutoLock(); + private final Queue tasks = new ArrayDeque<>(); + + public void offer(Runnable task) + { + if (LOG.isDebugEnabled()) + LOG.debug("enqueuing task {} on {}", task, ProtocolSession.this); + try (AutoLock l = lock.lock()) + { + tasks.offer(task); + } + } + + private Runnable poll() + { + try (AutoLock l = lock.lock()) + { + return tasks.poll(); + } + } + + @Override + public Runnable produce() + { + Runnable task = poll(); + if (LOG.isDebugEnabled()) + LOG.debug("dequeued task {} on {}", task, ProtocolSession.this); + if (task != null) + return task; + + processWritableStreams(); + processReadableStreams(); + + task = poll(); + if (LOG.isDebugEnabled()) + LOG.debug("dequeued produced task {} on {}", task, ProtocolSession.this); + if (task != null) + return task; + + CloseInfo closeInfo = session.getRemoteCloseInfo(); + if (closeInfo != null) + onClose(closeInfo.error(), closeInfo.reason()); + + getQuicSession().processingComplete(); + return null; + } + } } diff --git a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicConnection.java b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicConnection.java index d546547b11a..8a8afa726f2 100644 --- a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicConnection.java +++ b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicConnection.java @@ -131,6 +131,8 @@ public abstract class QuicConnection extends AbstractConnection listeners.remove((QuicSession.Listener)listener); } + public abstract void onFillable(); + @Override public abstract boolean onIdleExpired(); @@ -159,8 +161,14 @@ public abstract class QuicConnection extends AbstractConnection } } - @Override - public void onFillable() + protected abstract QuicSession createSession(SocketAddress remoteAddress, ByteBuffer cipherBuffer) throws IOException; + + public void write(Callback callback, SocketAddress remoteAddress, ByteBuffer... buffers) + { + flusher.offer(callback, remoteAddress, buffers); + } + + protected Runnable receiveAndProcess() { try { @@ -172,18 +180,18 @@ public abstract class QuicConnection extends AbstractConnection int fill = remoteAddress == DatagramChannelEndPoint.EOF ? -1 : cipherBuffer.remaining(); if (LOG.isDebugEnabled()) LOG.debug("filled cipher buffer with {} byte(s)", fill); - // ServerDatagramEndPoint will only return -1 if input is shut down. + // DatagramChannelEndPoint will only return -1 if input is shut down. if (fill < 0) { byteBufferPool.release(cipherBuffer); getEndPoint().shutdownOutput(); - return; + return null; } if (fill == 0) { byteBufferPool.release(cipherBuffer); fillInterested(); - return; + return null; } if (LOG.isDebugEnabled()) @@ -224,24 +232,23 @@ public abstract class QuicConnection extends AbstractConnection } if (LOG.isDebugEnabled()) - LOG.debug("packet is for existing session with connection ID {}, processing it ({} byte(s))", quicheConnectionId, cipherBuffer.remaining()); - session.process(remoteAddress, cipherBuffer); + LOG.debug("packet is for existing session {}, processing {} bytes", session, cipherBuffer.remaining()); + Runnable task = session.process(remoteAddress, cipherBuffer); + if (LOG.isDebugEnabled()) + LOG.debug("processing session {} produced task {}", session, task); + if (task != null) + return task; } } catch (Throwable x) { if (LOG.isDebugEnabled()) - LOG.debug("caught exception in onFillable loop", x); + LOG.debug("exception in receiveAndProcess()", x); + // TODO: close? + return null; } } - protected abstract QuicSession createSession(SocketAddress remoteAddress, ByteBuffer cipherBuffer) throws IOException; - - public void write(Callback callback, SocketAddress remoteAddress, ByteBuffer... buffers) - { - flusher.offer(callback, remoteAddress, buffers); - } - private class Flusher extends IteratingCallback { private final AutoLock lock = new AutoLock(); diff --git a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java index 092c1a92df4..0596790de11 100644 --- a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java +++ b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @@ -58,6 +59,7 @@ public abstract class QuicSession extends ContainerLifeCycle private final AtomicLong[] ids = new AtomicLong[StreamType.values().length]; private final ConcurrentMap endPoints = new ConcurrentHashMap<>(); + private final AtomicBoolean processing = new AtomicBoolean(); private final Executor executor; private final Scheduler scheduler; private final ByteBufferPool byteBufferPool; @@ -282,7 +284,7 @@ public abstract class QuicSession extends ContainerLifeCycle this.quicheConnectionId = quicheConnectionId; } - public void process(SocketAddress remoteAddress, ByteBuffer cipherBufferIn) throws IOException + public Runnable process(SocketAddress remoteAddress, ByteBuffer cipherBufferIn) throws IOException { // While the connection ID remains the same, // the remote address may change so store it again. @@ -327,14 +329,23 @@ public abstract class QuicSession extends ContainerLifeCycle protocolSession = session = createProtocolSession(); addManaged(session); } - session.process(); + + if (processing.compareAndSet(false, true)) + return session::process; + return null; } else { flush(); + return null; } } + void processingComplete() + { + processing.set(false); + } + protected abstract ProtocolSession createProtocolSession(); List getWritableStreamIds() diff --git a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicStreamEndPoint.java b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicStreamEndPoint.java index 6124f9ed337..4d51633c266 100644 --- a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicStreamEndPoint.java +++ b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicStreamEndPoint.java @@ -26,6 +26,7 @@ import org.eclipse.jetty.io.FillInterest; import org.eclipse.jetty.io.WriteFlusher; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.thread.AutoLock; import org.eclipse.jetty.util.thread.Scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,14 +41,17 @@ public class QuicStreamEndPoint extends AbstractEndPoint private static final Logger LOG = LoggerFactory.getLogger(QuicStreamEndPoint.class); private static final ByteBuffer LAST_FLAG = ByteBuffer.allocate(0); + private final AutoLock lock = new AutoLock(); private final QuicSession session; private final long streamId; + private boolean readable; public QuicStreamEndPoint(Scheduler scheduler, QuicSession session, long streamId) { super(scheduler); this.session = session; this.streamId = streamId; + this.readable = true; } public QuicSession getQuicSession() @@ -209,12 +213,38 @@ public class QuicStreamEndPoint extends AbstractEndPoint getWriteFlusher().completeWrite(); } - public boolean onReadable() + public void onReadable() { - if (LOG.isDebugEnabled()) - LOG.debug("stream {} is readable", streamId); + // TODO: use AtomicBoolean. + try (AutoLock l = lock.lock()) + { + if (LOG.isDebugEnabled()) + LOG.debug("stream {} is readable, processing: {}", streamId, readable); + if (!readable) + return; + readable = false; + } getFillInterest().fillable(); - return isFillInterested(); + } + + @Override + public void fillInterested(Callback callback) + { + try (AutoLock l = lock.lock()) + { + readable = true; + } + super.fillInterested(callback); + } + + @Override + public boolean tryFillInterested(Callback callback) + { + try (AutoLock l = lock.lock()) + { + readable = true; + } + return super.tryFillInterested(callback); } @Override diff --git a/jetty-quic/quic-server/src/main/java/org/eclipse/jetty/quic/server/ServerProtocolSession.java b/jetty-quic/quic-server/src/main/java/org/eclipse/jetty/quic/server/ServerProtocolSession.java index c8bf1b36b20..95a3995b910 100644 --- a/jetty-quic/quic-server/src/main/java/org/eclipse/jetty/quic/server/ServerProtocolSession.java +++ b/jetty-quic/quic-server/src/main/java/org/eclipse/jetty/quic/server/ServerProtocolSession.java @@ -34,13 +34,13 @@ public class ServerProtocolSession extends ProtocolSession } @Override - protected boolean onReadable(long readableStreamId) + protected void onReadable(long readableStreamId) { // On the server, we need a get-or-create semantic in case of reads. QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureProtocolEndPoint); if (LOG.isDebugEnabled()) LOG.debug("stream #{} selected for read: {}", readableStreamId, streamEndPoint); - return streamEndPoint.onReadable(); + streamEndPoint.onReadable(); } @Override diff --git a/jetty-quic/quic-server/src/main/java/org/eclipse/jetty/quic/server/ServerQuicConnection.java b/jetty-quic/quic-server/src/main/java/org/eclipse/jetty/quic/server/ServerQuicConnection.java index d68f161f0be..bace9c3fe19 100644 --- a/jetty-quic/quic-server/src/main/java/org/eclipse/jetty/quic/server/ServerQuicConnection.java +++ b/jetty-quic/quic-server/src/main/java/org/eclipse/jetty/quic/server/ServerQuicConnection.java @@ -32,7 +32,10 @@ import org.eclipse.jetty.quic.server.internal.SimpleTokenValidator; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.component.LifeCycle; +import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.Scheduler; +import org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +48,7 @@ public class ServerQuicConnection extends QuicConnection private final QuicheConfig quicheConfig; private final Connector connector; + private final AdaptiveExecutionStrategy strategy; private final SessionTimeouts sessionTimeouts; protected ServerQuicConnection(Connector connector, EndPoint endPoint, QuicheConfig quicheConfig) @@ -52,6 +56,7 @@ public class ServerQuicConnection extends QuicConnection super(connector.getExecutor(), connector.getScheduler(), connector.getByteBufferPool(), endPoint); this.quicheConfig = quicheConfig; this.connector = connector; + this.strategy = new AdaptiveExecutionStrategy(new QuicProducer(), getExecutor()); this.sessionTimeouts = new SessionTimeouts(connector.getScheduler()); } @@ -59,9 +64,23 @@ public class ServerQuicConnection extends QuicConnection public void onOpen() { super.onOpen(); + LifeCycle.start(strategy); fillInterested(); } + @Override + public void onClose(Throwable cause) + { + LifeCycle.stop(strategy); + super.onClose(cause); + } + + @Override + public void onFillable() + { + strategy.produce(); + } + @Override protected QuicSession createSession(SocketAddress remoteAddress, ByteBuffer cipherBuffer) throws IOException { @@ -118,6 +137,15 @@ public class ServerQuicConnection extends QuicConnection // listening DatagramChannelEndPoint, so it must not be closed. } + private class QuicProducer implements ExecutionStrategy.Producer + { + @Override + public Runnable produce() + { + return receiveAndProcess(); + } + } + private class SessionTimeouts extends CyclicTimeouts { private SessionTimeouts(Scheduler scheduler) diff --git a/jetty-quic/quic-server/src/main/java/org/eclipse/jetty/quic/server/ServerQuicSession.java b/jetty-quic/quic-server/src/main/java/org/eclipse/jetty/quic/server/ServerQuicSession.java index 67906a10373..e052dc0e6de 100644 --- a/jetty-quic/quic-server/src/main/java/org/eclipse/jetty/quic/server/ServerQuicSession.java +++ b/jetty-quic/quic-server/src/main/java/org/eclipse/jetty/quic/server/ServerQuicSession.java @@ -103,10 +103,10 @@ public class ServerQuicSession extends QuicSession implements CyclicTimeouts.Exp } @Override - public void process(SocketAddress remoteAddress, ByteBuffer cipherBufferIn) throws IOException + public Runnable process(SocketAddress remoteAddress, ByteBuffer cipherBufferIn) throws IOException { notIdle(); - super.process(remoteAddress, cipherBufferIn); + return super.process(remoteAddress, cipherBufferIn); } @Override