From 607239028c0be8ef80dbabdb229eb44b3778d630 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Wed, 22 Jul 2015 17:31:54 +1000 Subject: [PATCH] 470727 - Thread Starvation of selector wakeups. Changed the CallBack.NonBlocking to a default Callback.isNonBlocking, so that wrapping callbacks can determine if they are NonBlocking or not. --- .../org/eclipse/jetty/client/HttpSender.java | 13 ++ .../jetty/client/http/HttpSenderOverHTTP.java | 6 + .../client/util/DeferredContentProvider.java | 4 +- .../util/OutputStreamContentProvider.java | 6 + .../jetty/client/HttpClientFailureTest.java | 2 +- .../jetty/client/HttpClientStreamTest.java | 6 +- .../fcgi/client/http/HttpSenderOverFCGI.java | 2 +- .../fcgi/server/HttpTransportOverFCGI.java | 4 +- .../jetty/http2/client/HTTP2Client.java | 2 +- .../http2/client/HTTP2ClientSession.java | 4 +- .../jetty/http2/client/AsyncIOTest.java | 8 +- .../http2/client/FlowControlStrategyTest.java | 52 ++--- .../jetty/http2/client/IdleTimeoutTest.java | 20 +- .../eclipse/jetty/http2/client/PingTest.java | 2 +- .../http2/client/PushCacheFilterTest.java | 2 +- .../http2/client/SessionFailureTest.java | 2 +- .../jetty/http2/client/StreamCloseTest.java | 18 +- .../jetty/http2/client/StreamResetTest.java | 18 +- .../http2/BufferingFlowControlStrategy.java | 2 +- .../org/eclipse/jetty/http2/HTTP2Session.java | 18 +- .../org/eclipse/jetty/http2/HTTP2Stream.java | 6 +- .../http2/SimpleFlowControlStrategy.java | 2 +- .../client/http/HttpConnectionOverHTTP2.java | 2 +- .../client/http/HttpReceiverOverHTTP2.java | 2 +- .../http2/server/ByteBufferCallback.java | 6 + .../server/HTTP2ServerConnectionFactory.java | 2 +- .../http2/server/HTTP2ServerSession.java | 4 +- .../http2/server/HttpChannelOverHTTP2.java | 6 + .../http2/server/HttpTransportOverHTTP2.java | 8 +- .../eclipse/jetty/http2/server/CloseTest.java | 8 +- .../eclipse/jetty/io/AbstractEndPoint.java | 10 +- .../org/eclipse/jetty/io/FillInterest.java | 2 +- .../jetty/io/SelectChannelEndPoint.java | 79 ++++++- .../org/eclipse/jetty/io/WriteFlusher.java | 2 +- .../eclipse/jetty/io/SelectorManagerTest.java | 4 +- .../jetty/proxy/AsyncMiddleManServlet.java | 2 +- .../org/eclipse/jetty/server/HttpChannel.java | 8 +- .../eclipse/jetty/server/HttpConnection.java | 18 +- .../org/eclipse/jetty/server/HttpInput.java | 9 +- .../org/eclipse/jetty/server/HttpOutput.java | 2 +- .../jetty/server/ConnectorTimeoutTest.java | 3 + .../DefaultServletStarvationTest.java | 216 ++++++++++++++++++ .../eclipse/jetty/util/BlockingCallback.java | 3 +- .../java/org/eclipse/jetty/util/Callback.java | 57 +++-- .../jetty/util/IteratingNestedCallback.java | 6 + .../jetty/util/SharedBlockingCallback.java | 5 +- .../common/BlockingWriteCallback.java | 2 +- 47 files changed, 519 insertions(+), 146 deletions(-) create mode 100644 jetty-servlets/src/test/java/org/eclipse/jetty/servlets/DefaultServletStarvationTest.java diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java index 3f8c89eef11..650587e48c1 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java @@ -672,6 +672,13 @@ public abstract class HttpSender implements AsyncContentProvider.Listener private class CommitCallback implements Callback { + + @Override + public boolean isNonBlocking() + { + return content.isNonBlocking(); + } + @Override public void succeeded() { @@ -882,6 +889,12 @@ public abstract class HttpSender implements AsyncContentProvider.Listener private class LastContentCallback implements Callback { + @Override + public boolean isNonBlocking() + { + return content.isNonBlocking(); + } + @Override public void succeeded() { diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java index b3d15d01510..887315105be 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java @@ -230,6 +230,12 @@ public class HttpSenderOverHTTP extends HttpSender this.buffers = buffers; } + @Override + public boolean isNonBlocking() + { + return callback.isNonBlocking(); + } + @Override public void succeeded() { diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/DeferredContentProvider.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/DeferredContentProvider.java index be88c9e4c70..ea094337d5e 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/util/DeferredContentProvider.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/util/DeferredContentProvider.java @@ -87,7 +87,7 @@ import org.eclipse.jetty.util.Callback; */ public class DeferredContentProvider implements AsyncContentProvider, Callback, Closeable { - private static final Chunk CLOSE = new Chunk(BufferUtil.EMPTY_BUFFER, Callback.Adapter.INSTANCE); + private static final Chunk CLOSE = new Chunk(BufferUtil.EMPTY_BUFFER, Callback.NOOP); private final Object lock = this; private final ArrayQueue chunks = new ArrayQueue<>(4, 64, lock); @@ -143,7 +143,7 @@ public class DeferredContentProvider implements AsyncContentProvider, Callback, */ public boolean offer(ByteBuffer buffer) { - return offer(buffer, Callback.Adapter.INSTANCE); + return offer(buffer, Callback.NOOP); } public boolean offer(ByteBuffer buffer, Callback callback) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/OutputStreamContentProvider.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/OutputStreamContentProvider.java index 5af314ff22c..d7e8e65feaa 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/util/OutputStreamContentProvider.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/util/OutputStreamContentProvider.java @@ -78,6 +78,12 @@ public class OutputStreamContentProvider implements AsyncContentProvider, Callba private final DeferredContentProvider deferred = new DeferredContentProvider(); private final OutputStream output = new DeferredOutputStream(); + @Override + public boolean isNonBlocking() + { + return deferred.isNonBlocking(); + } + @Override public long getLength() { diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientFailureTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientFailureTest.java index 9453025ba6f..2584701e5a6 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientFailureTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientFailureTest.java @@ -157,7 +157,7 @@ public class HttpClientFailureTest Assert.assertTrue(commitLatch.await(5, TimeUnit.SECONDS)); final CountDownLatch contentLatch = new CountDownLatch(1); - content.offer(ByteBuffer.allocate(1024), new Callback.Adapter() + content.offer(ByteBuffer.allocate(1024), new Callback() { @Override public void failed(Throwable x) diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java index 572703f20fb..f88312d7a44 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java @@ -691,7 +691,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest try (DeferredContentProvider content = new DeferredContentProvider()) { // Make the content immediately available. - content.offer(ByteBuffer.allocate(1024), new Callback.Adapter() + content.offer(ByteBuffer.allocate(1024), new Callback() { @Override public void succeeded() @@ -976,7 +976,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest start(new EmptyServerHandler()); final CountDownLatch failLatch = new CountDownLatch(2); - final Callback.Adapter callback = new Callback.Adapter() + final Callback callback = new Callback() { @Override public void failed(Throwable x) @@ -1014,7 +1014,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest // Make sure that adding more content results in the callback to be failed. final CountDownLatch latch = new CountDownLatch(1); - content.offer(ByteBuffer.wrap(new byte[128]), new Callback.Adapter() + content.offer(ByteBuffer.wrap(new byte[128]), new Callback() { @Override public void failed(Throwable x) diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpSenderOverFCGI.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpSenderOverFCGI.java index a2571858d6b..abb81380802 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpSenderOverFCGI.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpSenderOverFCGI.java @@ -99,7 +99,7 @@ public class HttpSenderOverFCGI extends HttpSender int id = getHttpChannel().getRequest(); boolean hasContent = content.hasContent(); Generator.Result headersResult = generator.generateRequestHeaders(id, fcgiHeaders, - hasContent ? callback : Callback.Adapter.INSTANCE); + hasContent ? callback : Callback.NOOP); if (hasContent) { getHttpChannel().flush(headersResult); diff --git a/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpTransportOverFCGI.java b/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpTransportOverFCGI.java index acf1daefda1..39bc2ead1e9 100644 --- a/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpTransportOverFCGI.java +++ b/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpTransportOverFCGI.java @@ -105,7 +105,7 @@ public class HttpTransportOverFCGI implements HttpTransport { if (lastContent) { - Generator.Result headersResult = generateResponseHeaders(info, Callback.Adapter.INSTANCE); + Generator.Result headersResult = generateResponseHeaders(info, Callback.NOOP); Generator.Result contentResult = generateResponseContent(BufferUtil.EMPTY_BUFFER, true, callback); flusher.flush(headersResult, contentResult); } @@ -117,7 +117,7 @@ public class HttpTransportOverFCGI implements HttpTransport } else { - Generator.Result headersResult = generateResponseHeaders(info, Callback.Adapter.INSTANCE); + Generator.Result headersResult = generateResponseHeaders(info, Callback.NOOP); Generator.Result contentResult = generateResponseContent(content, lastContent, callback); flusher.flush(headersResult, contentResult); } diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java index 6f5bd6f4ec2..f5fdb466f75 100644 --- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java +++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java @@ -299,7 +299,7 @@ public class HTTP2Client extends ContainerLifeCycle private void closeConnections() { for (ISession session : sessions) - session.close(ErrorCode.NO_ERROR.code, null, Callback.Adapter.INSTANCE); + session.close(ErrorCode.NO_ERROR.code, null, Callback.NOOP); sessions.clear(); } diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientSession.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientSession.java index 03a476e4eb4..3822d25779c 100644 --- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientSession.java +++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientSession.java @@ -55,7 +55,7 @@ public class HTTP2ClientSession extends HTTP2Session } else { - stream.process(frame, Callback.Adapter.INSTANCE); + stream.process(frame, Callback.NOOP); notifyHeaders(stream, frame); } } @@ -92,7 +92,7 @@ public class HTTP2ClientSession extends HTTP2Session else { IStream pushStream = createRemoteStream(pushStreamId); - pushStream.process(frame, Callback.Adapter.INSTANCE); + pushStream.process(frame, Callback.NOOP); Stream.Listener listener = notifyPush(stream, pushStream, frame); pushStream.setListener(listener); } diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AsyncIOTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AsyncIOTest.java index 4c495c9f75f..02ee1c29739 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AsyncIOTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AsyncIOTest.java @@ -94,7 +94,7 @@ public class AsyncIOTest extends AbstractTest } }); Stream stream = promise.get(5, TimeUnit.SECONDS); - stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(16), true), Callback.Adapter.INSTANCE); + stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(16), true), Callback.NOOP); Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); } @@ -148,7 +148,7 @@ public class AsyncIOTest extends AbstractTest // Wait until service() returns. Thread.sleep(1000); - stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(16), true), Callback.Adapter.INSTANCE); + stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(16), true), Callback.NOOP); Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); } @@ -204,11 +204,11 @@ public class AsyncIOTest extends AbstractTest // Wait until service() returns. Thread.sleep(1000); - stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), false), Callback.Adapter.INSTANCE); + stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), false), Callback.NOOP); // Wait until onDataAvailable() returns. Thread.sleep(1000); - stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), true), Callback.Adapter.INSTANCE); + stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), true), Callback.NOOP); Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); // Make sure onDataAvailable() has been called twice diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/FlowControlStrategyTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/FlowControlStrategyTest.java index 239a7c0d3cd..f19c7844bcc 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/FlowControlStrategyTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/FlowControlStrategyTest.java @@ -247,7 +247,7 @@ public abstract class FlowControlStrategyTest HttpFields fields = new HttpFields(); MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, 200, fields); HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, true); - stream.headers(responseFrame, Callback.Adapter.INSTANCE); + stream.headers(responseFrame, Callback.NOOP); return new Stream.Listener.Adapter() { @@ -263,7 +263,7 @@ public abstract class FlowControlStrategyTest callbackRef.set(callback); Map settings = new HashMap<>(); settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, size); - stream.getSession().settings(new SettingsFrame(settings, false), Callback.Adapter.INSTANCE); + stream.getSession().settings(new SettingsFrame(settings, false), Callback.NOOP); // Do not succeed the callback here. } else if (dataFrameCount > 1) @@ -293,11 +293,11 @@ public abstract class FlowControlStrategyTest Stream stream = promise.get(5, TimeUnit.SECONDS); // Send first chunk that exceeds the window. - stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(size * 2), false), Callback.Adapter.INSTANCE); + stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(size * 2), false), Callback.NOOP); settingsLatch.await(5, TimeUnit.SECONDS); // Send the second chunk of data, must not arrive since we're flow control stalled on the client. - stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(size * 2), true), Callback.Adapter.INSTANCE); + stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(size * 2), true), Callback.NOOP); Assert.assertFalse(dataLatch.await(1, TimeUnit.SECONDS)); // Consume the data arrived to server, this will resume flow control on the client. @@ -325,10 +325,10 @@ public abstract class FlowControlStrategyTest { MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields()); HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, false); - stream.headers(responseFrame, Callback.Adapter.INSTANCE); + stream.headers(responseFrame, Callback.NOOP); DataFrame dataFrame = new DataFrame(stream.getId(), ByteBuffer.allocate(length), true); - stream.data(dataFrame, Callback.Adapter.INSTANCE); + stream.data(dataFrame, Callback.NOOP); return null; } }); @@ -337,7 +337,7 @@ public abstract class FlowControlStrategyTest Map settings = new HashMap<>(); settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, windowSize); - session.settings(new SettingsFrame(settings, false), Callback.Adapter.INSTANCE); + session.settings(new SettingsFrame(settings, false), Callback.NOOP); Assert.assertTrue(settingsLatch.await(5, TimeUnit.SECONDS)); @@ -417,7 +417,7 @@ public abstract class FlowControlStrategyTest { MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields()); HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, false); - stream.headers(responseFrame, Callback.Adapter.INSTANCE); + stream.headers(responseFrame, Callback.NOOP); return new Stream.Listener.Adapter() { private AtomicInteger dataFrames = new AtomicInteger(); @@ -474,7 +474,7 @@ public abstract class FlowControlStrategyTest final int length = 5 * windowSize; DataFrame dataFrame = new DataFrame(stream.getId(), ByteBuffer.allocate(length), true); - stream.data(dataFrame, Callback.Adapter.INSTANCE); + stream.data(dataFrame, Callback.NOOP); Callback callback = exchanger.exchange(null, 5, TimeUnit.SECONDS); checkThatWeAreFlowControlStalled(exchanger); @@ -519,7 +519,7 @@ public abstract class FlowControlStrategyTest // Send data to consume most of the session window. ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE - windowSize); DataFrame dataFrame = new DataFrame(stream.getId(), data, true); - stream.data(dataFrame, Callback.Adapter.INSTANCE); + stream.data(dataFrame, Callback.NOOP); return null; } else @@ -527,9 +527,9 @@ public abstract class FlowControlStrategyTest // For every stream, send down half the window size of data. MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields()); HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, false); - stream.headers(responseFrame, Callback.Adapter.INSTANCE); + stream.headers(responseFrame, Callback.NOOP); DataFrame dataFrame = new DataFrame(stream.getId(), ByteBuffer.allocate(windowSize / 2), true); - stream.data(dataFrame, Callback.Adapter.INSTANCE); + stream.data(dataFrame, Callback.NOOP); return null; } } @@ -615,9 +615,9 @@ public abstract class FlowControlStrategyTest { MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields()); HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, false); - stream.headers(responseFrame, Callback.Adapter.INSTANCE); + stream.headers(responseFrame, Callback.NOOP); DataFrame dataFrame = new DataFrame(stream.getId(), ByteBuffer.wrap(data), true); - stream.data(dataFrame, Callback.Adapter.INSTANCE); + stream.data(dataFrame, Callback.NOOP); return null; } }); @@ -675,8 +675,8 @@ public abstract class FlowControlStrategyTest @Override public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) { - stream.data(new DataFrame(stream.getId(), ByteBuffer.wrap(chunk1), false), Callback.Adapter.INSTANCE); - stream.data(new DataFrame(stream.getId(), ByteBuffer.wrap(chunk2), true), Callback.Adapter.INSTANCE); + stream.data(new DataFrame(stream.getId(), ByteBuffer.wrap(chunk1), false), Callback.NOOP); + stream.data(new DataFrame(stream.getId(), ByteBuffer.wrap(chunk2), true), Callback.NOOP); dataLatch.countDown(); return null; } @@ -685,7 +685,7 @@ public abstract class FlowControlStrategyTest Session session = newClient(new Session.Listener.Adapter()); Map settings = new HashMap<>(); settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, 0); - session.settings(new SettingsFrame(settings, false), Callback.Adapter.INSTANCE); + session.settings(new SettingsFrame(settings, false), Callback.NOOP); Assert.assertTrue(settingsLatch.get().await(5, TimeUnit.SECONDS)); byte[] content = new byte[chunk1.length + chunk2.length]; @@ -712,7 +712,7 @@ public abstract class FlowControlStrategyTest settingsLatch.set(new CountDownLatch(1)); settings.clear(); settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, chunk1.length / 2); - session.settings(new SettingsFrame(settings, false), Callback.Adapter.INSTANCE); + session.settings(new SettingsFrame(settings, false), Callback.NOOP); Assert.assertTrue(settingsLatch.get().await(5, TimeUnit.SECONDS)); Assert.assertTrue(responseLatch.await(5, TimeUnit.SECONDS)); @@ -734,7 +734,7 @@ public abstract class FlowControlStrategyTest { MetaData metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields()); HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, false); - stream.headers(responseFrame, Callback.Adapter.INSTANCE); + stream.headers(responseFrame, Callback.NOOP); return new Stream.Listener.Adapter() { @Override @@ -787,7 +787,7 @@ public abstract class FlowControlStrategyTest ByteBuffer requestContent = ByteBuffer.wrap(requestData); DataFrame dataFrame = new DataFrame(stream.getId(), requestContent, true); - stream.data(dataFrame, Callback.Adapter.INSTANCE); + stream.data(dataFrame, Callback.NOOP); Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); @@ -820,7 +820,7 @@ public abstract class FlowControlStrategyTest Stream stream = streamPromise.get(5, TimeUnit.SECONDS); ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE); final CountDownLatch dataLatch = new CountDownLatch(1); - stream.data(new DataFrame(stream.getId(), data, false), new Callback.Adapter() + stream.data(new DataFrame(stream.getId(), data, false), new Callback.NonBlocking() { @Override public void succeeded() @@ -845,7 +845,7 @@ public abstract class FlowControlStrategyTest ByteBuffer extraData = ByteBuffer.allocate(1024); http2Session.getGenerator().data(lease, new DataFrame(stream.getId(), extraData, true), extraData.remaining()); List buffers = lease.getByteBuffers(); - http2Session.getEndPoint().write(Callback.Adapter.INSTANCE, buffers.toArray(new ByteBuffer[buffers.size()])); + http2Session.getEndPoint().write(Callback.NOOP, buffers.toArray(new ByteBuffer[buffers.size()])); // Expect the connection to be closed. Assert.assertTrue(closeLatch.await(5, TimeUnit.SECONDS)); @@ -885,7 +885,7 @@ public abstract class FlowControlStrategyTest Stream stream = streamPromise.get(5, TimeUnit.SECONDS); ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE); final CountDownLatch dataLatch = new CountDownLatch(1); - stream.data(new DataFrame(stream.getId(), data, false), new Callback.Adapter() + stream.data(new DataFrame(stream.getId(), data, false), new Callback.NonBlocking() { @Override public void succeeded() @@ -906,7 +906,7 @@ public abstract class FlowControlStrategyTest ByteBuffer extraData = ByteBuffer.allocate(1024); http2Session.getGenerator().data(lease, new DataFrame(stream.getId(), extraData, true), extraData.remaining()); List buffers = lease.getByteBuffers(); - http2Session.getEndPoint().write(Callback.Adapter.INSTANCE, buffers.toArray(new ByteBuffer[buffers.size()])); + http2Session.getEndPoint().write(Callback.NOOP, buffers.toArray(new ByteBuffer[buffers.size()])); // Expect the connection to be closed. Assert.assertTrue(closeLatch.await(5, TimeUnit.SECONDS)); @@ -936,7 +936,7 @@ public abstract class FlowControlStrategyTest // stream is reset, and automatically consumed to // keep the session window large for other streams. callback.failed(new Throwable()); - stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.Adapter.INSTANCE); + stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP); } }; } @@ -960,7 +960,7 @@ public abstract class FlowControlStrategyTest // Perform a big upload that will stall the flow control windows. ByteBuffer data = ByteBuffer.allocate(5 * FlowControlStrategy.DEFAULT_WINDOW_SIZE); final CountDownLatch dataLatch = new CountDownLatch(1); - stream.data(new DataFrame(stream.getId(), data, true), new Callback.Adapter() + stream.data(new DataFrame(stream.getId(), data, true), new Callback.NonBlocking() { @Override public void failed(Throwable x) diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/IdleTimeoutTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/IdleTimeoutTest.java index 6520be3d2d3..e99d570a6ee 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/IdleTimeoutTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/IdleTimeoutTest.java @@ -65,7 +65,7 @@ public class IdleTimeoutTest extends AbstractTest stream.setIdleTimeout(10 * idleTimeout); MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields()); HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, true); - stream.headers(responseFrame, Callback.Adapter.INSTANCE); + stream.headers(responseFrame, Callback.NOOP); return null; } }); @@ -154,7 +154,7 @@ public class IdleTimeoutTest extends AbstractTest sleep(idleTimeout + idleTimeout / 2); MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields()); HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, true); - stream.headers(responseFrame, Callback.Adapter.INSTANCE); + stream.headers(responseFrame, Callback.NOOP); return null; } }); @@ -207,7 +207,7 @@ public class IdleTimeoutTest extends AbstractTest stream.setIdleTimeout(10 * idleTimeout); MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields()); HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, true); - stream.headers(responseFrame, Callback.Adapter.INSTANCE); + stream.headers(responseFrame, Callback.NOOP); return null; } @@ -285,7 +285,7 @@ public class IdleTimeoutTest extends AbstractTest stream.setIdleTimeout(10 * idleTimeout); MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields()); HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, true); - stream.headers(responseFrame, Callback.Adapter.INSTANCE); + stream.headers(responseFrame, Callback.NOOP); return null; } @@ -452,7 +452,7 @@ public class IdleTimeoutTest extends AbstractTest sleep(idleTimeout / 2); final CountDownLatch dataLatch = new CountDownLatch(1); - stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), false), new Callback.Adapter() + stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), false), new Callback() { private int sends; @@ -461,7 +461,7 @@ public class IdleTimeoutTest extends AbstractTest { sleep(idleTimeout / 2); final boolean last = ++sends == 2; - stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), last), !last ? this : new Adapter() + stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), last), !last ? this : new Callback.NonBlocking() { @Override public void succeeded() @@ -486,7 +486,7 @@ public class IdleTimeoutTest extends AbstractTest public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) { MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields()); - stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.Adapter.INSTANCE); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); return null; } @@ -513,11 +513,11 @@ public class IdleTimeoutTest extends AbstractTest final Stream stream = promise.get(5, TimeUnit.SECONDS); sleep(idleTimeout / 2); - stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), false), Callback.Adapter.INSTANCE); + stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), false), Callback.NOOP); sleep(idleTimeout / 2); - stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), false), Callback.Adapter.INSTANCE); + stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), false), Callback.NOOP); sleep(idleTimeout / 2); - stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), true), Callback.Adapter.INSTANCE); + stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), true), Callback.NOOP); Assert.assertFalse(resetLatch.await(0, TimeUnit.SECONDS)); } diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/PingTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/PingTest.java index 8b4c2b3e2e1..4f03182be1f 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/PingTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/PingTest.java @@ -51,7 +51,7 @@ public class PingTest extends AbstractTest }); PingFrame frame = new PingFrame(payload, false); - session.ping(frame, Callback.Adapter.INSTANCE); + session.ping(frame, Callback.NOOP); Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); } diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/PushCacheFilterTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/PushCacheFilterTest.java index 9fb35d283c2..907bd4ebdc8 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/PushCacheFilterTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/PushCacheFilterTest.java @@ -205,7 +205,7 @@ public class PushCacheFilterTest extends AbstractTest { // Reset the stream as soon as we see the push. ResetFrame resetFrame = new ResetFrame(stream.getId(), ErrorCode.REFUSED_STREAM_ERROR.code); - stream.reset(resetFrame, Callback.Adapter.INSTANCE); + stream.reset(resetFrame, Callback.NOOP); return new Adapter() { @Override diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/SessionFailureTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/SessionFailureTest.java index 0e994ec6532..f9b3440b48c 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/SessionFailureTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/SessionFailureTest.java @@ -84,7 +84,7 @@ public class SessionFailureTest extends AbstractTest // Forcibly close the connection. ((HTTP2Session)stream.getSession()).getEndPoint().close(); // Now try to write something: it should fail. - stream.headers(frame, new Callback.Adapter() + stream.headers(frame, new Callback() { @Override public void failed(Throwable x) diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCloseTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCloseTest.java index 3e6eb34cccb..b5c4e346ca5 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCloseTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCloseTest.java @@ -80,7 +80,7 @@ public class StreamCloseTest extends AbstractTest { MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields()); HeadersFrame response = new HeadersFrame(stream.getId(), metaData, null, true); - stream.headers(response, new Callback.Adapter() + stream.headers(response, new Callback() { @Override public void succeeded() @@ -122,14 +122,14 @@ public class StreamCloseTest extends AbstractTest { MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields()); HeadersFrame response = new HeadersFrame(stream.getId(), metaData, null, false); - stream.headers(response, Callback.Adapter.INSTANCE); + stream.headers(response, Callback.NOOP); return new Stream.Listener.Adapter() { @Override public void onData(final Stream stream, DataFrame frame, final Callback callback) { Assert.assertTrue(((HTTP2Stream)stream).isRemotelyClosed()); - stream.data(frame, new Callback.Adapter() + stream.data(frame, new Callback() { @Override public void succeeded() @@ -163,7 +163,7 @@ public class StreamCloseTest extends AbstractTest Assert.assertFalse(((HTTP2Stream)stream).isLocallyClosed()); final CountDownLatch clientDataLatch = new CountDownLatch(1); - stream.data(new DataFrame(stream.getId(), ByteBuffer.wrap(new byte[512]), true), new Callback.Adapter() + stream.data(new DataFrame(stream.getId(), ByteBuffer.wrap(new byte[512]), true), new Callback() { @Override public void succeeded() @@ -198,7 +198,7 @@ public class StreamCloseTest extends AbstractTest // When created, pushed stream must be implicitly remotely closed. Assert.assertTrue(((HTTP2Stream)pushedStream).isRemotelyClosed()); // Send some data with endStream = true. - pushedStream.data(new DataFrame(pushedStream.getId(), ByteBuffer.allocate(16), true), new Callback.Adapter() + pushedStream.data(new DataFrame(pushedStream.getId(), ByteBuffer.allocate(16), true), new Callback() { @Override public void succeeded() @@ -210,7 +210,7 @@ public class StreamCloseTest extends AbstractTest } }, new Stream.Listener.Adapter()); HeadersFrame response = new HeadersFrame(stream.getId(), new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields()), null, true); - stream.headers(response, Callback.Adapter.INSTANCE); + stream.headers(response, Callback.NOOP); return null; } }); @@ -259,7 +259,7 @@ public class StreamCloseTest extends AbstractTest Assert.assertTrue(pushedStream.isReset()); Assert.assertTrue(pushedStream.isClosed()); HeadersFrame response = new HeadersFrame(stream.getId(), new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields()), null, true); - stream.headers(response, Callback.Adapter.INSTANCE); + stream.headers(response, Callback.NOOP); serverLatch.countDown(); } }); @@ -275,7 +275,7 @@ public class StreamCloseTest extends AbstractTest @Override public Stream.Listener onPush(final Stream pushedStream, PushPromiseFrame frame) { - pushedStream.reset(new ResetFrame(pushedStream.getId(), ErrorCode.REFUSED_STREAM_ERROR.code), new Callback.Adapter() + pushedStream.reset(new ResetFrame(pushedStream.getId(), ErrorCode.REFUSED_STREAM_ERROR.code), new Callback() { @Override public void succeeded() @@ -315,7 +315,7 @@ public class StreamCloseTest extends AbstractTest { ((HTTP2Session)stream.getSession()).getEndPoint().close(); // Try to write something to force an error. - stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1024), true), Callback.Adapter.INSTANCE); + stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1024), true), Callback.NOOP); } return null; } diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java index c8a2971641b..da8051e1f6d 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java @@ -102,7 +102,7 @@ public class StreamResetTest extends AbstractTest client.newStream(requestFrame, promise, new Stream.Listener.Adapter()); Stream stream = promise.get(5, TimeUnit.SECONDS); ResetFrame resetFrame = new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code); - stream.reset(resetFrame, Callback.Adapter.INSTANCE); + stream.reset(resetFrame, Callback.NOOP); Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS)); @@ -126,14 +126,14 @@ public class StreamResetTest extends AbstractTest { MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields()); HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, false); - stream.headers(responseFrame, Callback.Adapter.INSTANCE); + stream.headers(responseFrame, Callback.NOOP); return new Stream.Listener.Adapter() { @Override public void onData(Stream stream, DataFrame frame, Callback callback) { callback.succeeded(); - stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(16), true), Callback.Adapter.INSTANCE); + stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(16), true), Callback.NOOP); serverDataLatch.countDown(); } @@ -141,7 +141,7 @@ public class StreamResetTest extends AbstractTest public void onReset(Stream stream, ResetFrame frame) { // Simulate that there is pending data to send. - stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(16), true), new Callback.Adapter() + stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(16), true), new Callback() { @Override public void failed(Throwable x) @@ -192,14 +192,14 @@ public class StreamResetTest extends AbstractTest Stream stream2 = promise2.get(5, TimeUnit.SECONDS); ResetFrame resetFrame = new ResetFrame(stream1.getId(), ErrorCode.CANCEL_STREAM_ERROR.code); - stream1.reset(resetFrame, Callback.Adapter.INSTANCE); + stream1.reset(resetFrame, Callback.NOOP); Assert.assertTrue(serverResetLatch.await(5, TimeUnit.SECONDS)); // Stream MUST NOT receive data sent by server after reset. Assert.assertFalse(stream1DataLatch.await(1, TimeUnit.SECONDS)); // The other stream should still be working. - stream2.data(new DataFrame(stream2.getId(), ByteBuffer.allocate(16), true), Callback.Adapter.INSTANCE); + stream2.data(new DataFrame(stream2.getId(), ByteBuffer.allocate(16), true), Callback.NOOP); Assert.assertTrue(serverDataLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(stream2DataLatch.await(5, TimeUnit.SECONDS)); } @@ -262,7 +262,7 @@ public class StreamResetTest extends AbstractTest @Override public void onHeaders(Stream stream, HeadersFrame frame) { - stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.Adapter.INSTANCE); + stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP); resetLatch.countDown(); } }); @@ -314,7 +314,7 @@ public class StreamResetTest extends AbstractTest Thread.sleep(1000); HttpOutput output = (HttpOutput)response.getOutputStream(); - output.sendContent(data, new Callback.Adapter() + output.sendContent(data, new Callback() { @Override public void failed(Throwable x) @@ -341,7 +341,7 @@ public class StreamResetTest extends AbstractTest @Override public void onHeaders(Stream stream, HeadersFrame frame) { - stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.Adapter.INSTANCE); + stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP); resetLatch.countDown(); } }); diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/BufferingFlowControlStrategy.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/BufferingFlowControlStrategy.java index 95eb0d00e4b..7b48c44fd85 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/BufferingFlowControlStrategy.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/BufferingFlowControlStrategy.java @@ -138,7 +138,7 @@ public class BufferingFlowControlStrategy extends AbstractFlowControlStrategy } if (windowFrame != null) - session.frames(stream, Callback.Adapter.INSTANCE, windowFrame, windowFrames); + session.frames(stream, Callback.NOOP, windowFrame, windowFrames); } @Override diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 6356701c23e..042de6fc927 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -161,7 +161,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener { if (getRecvWindow() < 0) { - close(ErrorCode.FLOW_CONTROL_ERROR.code, "session_window_exceeded", Callback.Adapter.INSTANCE); + close(ErrorCode.FLOW_CONTROL_ERROR.code, "session_window_exceeded", Callback.NOOP); } else { @@ -209,7 +209,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener IStream stream = getStream(frame.getStreamId()); if (stream != null) - stream.process(frame, Callback.Adapter.INSTANCE); + stream.process(frame, Callback.NOOP); else notifyReset(this, frame); } @@ -296,7 +296,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener if (reply) { SettingsFrame replyFrame = new SettingsFrame(Collections.emptyMap(), true); - settings(replyFrame, Callback.Adapter.INSTANCE); + settings(replyFrame, Callback.NOOP); } } @@ -312,7 +312,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener else { PingFrame reply = new PingFrame(frame.getPayload(), true); - control(null, Callback.Adapter.INSTANCE, reply); + control(null, Callback.NOOP, reply); } } @@ -399,7 +399,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener @Override public void onConnectionFailure(int error, String reason) { - close(error, reason, Callback.Adapter.INSTANCE); + close(error, reason, Callback.NOOP); notifyFailure(this, new IOException(String.format("%d/%s", error, reason))); } @@ -619,7 +619,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener int maxCount = getMaxRemoteStreams(); if (maxCount >= 0 && remoteCount >= maxCount) { - reset(new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.Adapter.INSTANCE); + reset(new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.NOOP); return null; } if (remoteStreamCount.compareAndSet(remoteCount, remoteCount + 1)) @@ -640,7 +640,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener } else { - close(ErrorCode.PROTOCOL_ERROR.code, "duplicate_stream", Callback.Adapter.INSTANCE); + close(ErrorCode.PROTOCOL_ERROR.code, "duplicate_stream", Callback.NOOP); return null; } } @@ -783,7 +783,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener { // We have closed locally, and only shutdown // the output; now queue a disconnect. - control(null, Callback.Adapter.INSTANCE, new DisconnectFrame()); + control(null, Callback.NOOP, new DisconnectFrame()); break; } case REMOTELY_CLOSED: @@ -827,7 +827,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener case NOT_CLOSED: { // Real idle timeout, just close. - close(ErrorCode.NO_ERROR.code, "idle_timeout", Callback.Adapter.INSTANCE); + close(ErrorCode.NO_ERROR.code, "idle_timeout", Callback.NOOP); break; } case LOCALLY_CLOSED: diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index 1965fb72db5..a3ee68ae3ea 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -160,7 +160,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream close(); // Tell the other peer that we timed out. - reset(new ResetFrame(getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.Adapter.INSTANCE); + reset(new ResetFrame(getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP); // Notify the application. notifyTimeout(this, timeout); @@ -238,7 +238,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream { // It's a bad client, it does not deserve to be // treated gently by just resetting the stream. - session.close(ErrorCode.FLOW_CONTROL_ERROR.code, "stream_window_exceeded", Callback.Adapter.INSTANCE); + session.close(ErrorCode.FLOW_CONTROL_ERROR.code, "stream_window_exceeded", Callback.NOOP); callback.failed(new IOException("stream_window_exceeded")); return; } @@ -246,7 +246,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream // SPEC: remotely closed streams must be replied with a reset. if (isRemotelyClosed()) { - reset(new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR.code), Callback.Adapter.INSTANCE); + reset(new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR.code), Callback.NOOP); callback.failed(new EOFException("stream_closed")); return; } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/SimpleFlowControlStrategy.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/SimpleFlowControlStrategy.java index d5205e16aa5..b86f2005d4d 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/SimpleFlowControlStrategy.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/SimpleFlowControlStrategy.java @@ -67,6 +67,6 @@ public class SimpleFlowControlStrategy extends AbstractFlowControlStrategy } } - session.frames(stream, Callback.Adapter.INSTANCE, sessionFrame, streamFrame); + session.frames(stream, Callback.NOOP, sessionFrame, streamFrame); } } diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java index 86908d1e22f..0ad973f28f1 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java @@ -65,7 +65,7 @@ public class HttpConnectionOverHTTP2 extends HttpConnection // First close then abort, to be sure that the connection cannot be reused // from an onFailure() handler or by blocking code waiting for completion. getHttpDestination().close(this); - session.close(ErrorCode.NO_ERROR.code, null, Callback.Adapter.INSTANCE); + session.close(ErrorCode.NO_ERROR.code, null, Callback.NOOP); abort(new AsynchronousCloseException()); } diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java index 180b1b3b8c9..e473fa959d3 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java @@ -80,7 +80,7 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen public Stream.Listener onPush(Stream stream, PushPromiseFrame frame) { // Not supported. - stream.reset(new ResetFrame(stream.getId(), ErrorCode.REFUSED_STREAM_ERROR.code), Callback.Adapter.INSTANCE); + stream.reset(new ResetFrame(stream.getId(), ErrorCode.REFUSED_STREAM_ERROR.code), Callback.NOOP); return null; } diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/ByteBufferCallback.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/ByteBufferCallback.java index 262bcf8dad6..0bc629cc3ab 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/ByteBufferCallback.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/ByteBufferCallback.java @@ -36,6 +36,12 @@ public class ByteBufferCallback implements Callback this.callback = callback; } + @Override + public boolean isNonBlocking() + { + return callback.isNonBlocking(); + } + public ByteBuffer getByteBuffer() { return buffer; diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java index cef22ad1bfd..55627f8e099 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java @@ -143,7 +143,7 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF private void close(Stream stream, String reason) { final Session session = stream.getSession(); - session.close(ErrorCode.PROTOCOL_ERROR.code, reason, Callback.Adapter.INSTANCE); + session.close(ErrorCode.PROTOCOL_ERROR.code, reason, Callback.NOOP); } } diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java index 1d4f36adadc..c209895a885 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java @@ -62,7 +62,7 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis settings = Collections.emptyMap(); SettingsFrame frame = new SettingsFrame(settings, false); // TODO: consider sending a WINDOW_UPDATE to enlarge the session send window of the client. - frames(null, Callback.Adapter.INSTANCE, frame, Frame.EMPTY_ARRAY); + frames(null, Callback.NOOP, frame, Frame.EMPTY_ARRAY); } @Override @@ -74,7 +74,7 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis IStream stream = createRemoteStream(frame.getStreamId()); if (stream != null) { - stream.process(frame, Callback.Adapter.INSTANCE); + stream.process(frame, Callback.NOOP); Stream.Listener listener = notifyNewStream(stream, frame); stream.setListener(listener); } diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java index eeec5bfd376..4836bfae128 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java @@ -164,6 +164,12 @@ public class HttpChannelOverHTTP2 extends HttpChannel boolean handle = onContent(new HttpInput.Content(copy) { + @Override + public boolean isNonBlocking() + { + return callback.isNonBlocking(); + } + @Override public void succeeded() { diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java index 144f1c0dc22..0588930b2b8 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java @@ -194,7 +194,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport { // If the stream is not closed, it is still reading the request content. // Send a reset to the other end so that it stops sending data. - stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.Adapter.INSTANCE); + stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP); // Now that this stream is reset, in-flight data frames will be consumed and discarded. // Consume the existing queued data frames to avoid stalling the flow control. HttpChannel channel = (HttpChannel)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE); @@ -209,11 +209,11 @@ public class HttpTransportOverHTTP2 implements HttpTransport if (LOG.isDebugEnabled()) LOG.debug("HTTP2 Response #{} aborted", stream == null ? -1 : stream.getId()); if (stream != null) - stream.reset(new ResetFrame(stream.getId(), ErrorCode.INTERNAL_ERROR.code), Callback.Adapter.INSTANCE); + stream.reset(new ResetFrame(stream.getId(), ErrorCode.INTERNAL_ERROR.code), Callback.NOOP); } - private class CommitCallback implements Callback - { + private class CommitCallback implements Callback.NonBlocking + { @Override public void succeeded() { diff --git a/jetty-http2/http2-server/src/test/java/org/eclipse/jetty/http2/server/CloseTest.java b/jetty-http2/http2-server/src/test/java/org/eclipse/jetty/http2/server/CloseTest.java index 0ebed6eaf4d..08c5bf76bce 100644 --- a/jetty-http2/http2-server/src/test/java/org/eclipse/jetty/http2/server/CloseTest.java +++ b/jetty-http2/http2-server/src/test/java/org/eclipse/jetty/http2/server/CloseTest.java @@ -62,7 +62,7 @@ public class CloseTest extends AbstractServerTest sessionRef.set(stream.getSession()); MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields()); // Reply with HEADERS. - stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.Adapter.INSTANCE); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); closeLatch.await(5, TimeUnit.SECONDS); return null; } @@ -127,7 +127,7 @@ public class CloseTest extends AbstractServerTest { sessionRef.set(stream.getSession()); MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields()); - stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.Adapter.INSTANCE); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); return null; } }); @@ -190,8 +190,8 @@ public class CloseTest extends AbstractServerTest stream.setIdleTimeout(10 * idleTimeout); sessionRef.set(stream.getSession()); MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields()); - stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.Adapter.INSTANCE); - stream.getSession().close(ErrorCode.NO_ERROR.code, "OK", Callback.Adapter.INSTANCE); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + stream.getSession().close(ErrorCode.NO_ERROR.code, "OK", Callback.NOOP); return null; } }); diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java index 6cebb2c40ac..8c723033054 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java @@ -194,8 +194,16 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint @Override public String toString() { + Class c=getClass(); + String name=c.getSimpleName(); + while (name.length()==0 && c.getSuperclass()!=null) + { + c=c.getSuperclass(); + name=c.getSimpleName(); + } + return String.format("%s@%x{%s<->%d,%s,%s,%s,%s,%s,%d/%d,%s}", - getClass().getSimpleName(), + name, hashCode(), getRemoteAddress(), getLocalAddress().getPort(), diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java b/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java index 758601bfe32..1b7be493a01 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java @@ -108,7 +108,7 @@ public abstract class FillInterest public boolean isCallbackNonBlocking() { Callback callback = _interested.get(); - return callback instanceof Callback.NonBlocking; + return callback.isNonBlocking(); } /** diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java index ff05edd6c24..b45c39b80e2 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java @@ -53,10 +53,63 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel */ private int _desiredInterestOps; - private final Runnable _runUpdateKey = new Runnable() { public void run() { updateKey(); } }; - private final Runnable _runFillable = new Runnable() { public void run() { getFillInterest().fillable(); } }; - private final Runnable _runCompleteWrite = new Runnable() { public void run() { getWriteFlusher().completeWrite(); } }; - private final Runnable _runFillableCompleteWrite = new Runnable() { public void run() { getFillInterest().fillable(); getWriteFlusher().completeWrite(); } }; + private final Runnable _runUpdateKey = new Runnable() + { + @Override + public void run() + { + updateKey(); + } + + @Override + public String toString() + { + return SelectChannelEndPoint.this.toString()+":runUpdateKey"; + } + }; + private final Runnable _runFillable = new Runnable() + { + @Override + public void run() + { + getFillInterest().fillable(); + } + + @Override + public String toString() + { + return SelectChannelEndPoint.this.toString()+":runFillable"; + } + }; + private final Runnable _runCompleteWrite = new Runnable() + { + @Override + public void run() + { + getWriteFlusher().completeWrite(); + } + + @Override + public String toString() + { + return SelectChannelEndPoint.this.toString()+":runCompleteWrite"; + } + }; + private final Runnable _runFillableCompleteWrite = new Runnable() + { + @Override + public void run() + { + getFillInterest().fillable(); + getWriteFlusher().completeWrite(); + } + + @Override + public String toString() + { + return SelectChannelEndPoint.this.toString()+":runFillableCompleteWrite"; + } + }; public SelectChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout) { @@ -97,12 +150,14 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel _desiredInterestOps = newInterestOps; } - if (LOG.isDebugEnabled()) - LOG.debug("onSelected {}->{} for {}", oldInterestOps, newInterestOps, this); boolean readable = (readyOps & SelectionKey.OP_READ) != 0; boolean writable = (readyOps & SelectionKey.OP_WRITE) != 0; + + if (LOG.isDebugEnabled()) + LOG.debug("onSelected {}->{} r={} w={} for {}", oldInterestOps, newInterestOps, readable, writable, this); + // Run non-blocking code immediately. // This producer knows that this non-blocking code is special // and that it must be run in this thread and not fed to the @@ -110,18 +165,26 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel // tasks (or it may starve forever just after having run them). if (readable && getFillInterest().isCallbackNonBlocking()) { + if (LOG.isDebugEnabled()) + LOG.debug("Direct readable run {}",this); _runFillable.run(); readable = false; } if (writable && getWriteFlusher().isCallbackNonBlocking()) { + if (LOG.isDebugEnabled()) + LOG.debug("Direct writable run {}",this); _runCompleteWrite.run(); writable = false; } // return task to complete the job - return readable ? (writable ? _runFillableCompleteWrite : _runFillable) - : (writable ? _runCompleteWrite : null); + Runnable task= readable ? (writable ? _runFillableCompleteWrite : _runFillable) + : (writable ? _runCompleteWrite : null); + + if (LOG.isDebugEnabled()) + LOG.debug("task {}",task); + return task; } @Override diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java index 27053554654..0e9f6bea182 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java @@ -271,7 +271,7 @@ abstract public class WriteFlusher boolean isCallbackNonBlocking() { - return _callback instanceof Callback.NonBlocking; + return _callback.isNonBlocking(); } } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectorManagerTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectorManagerTest.java index d36ef49f634..66d39d3a167 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectorManagerTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectorManagerTest.java @@ -120,7 +120,7 @@ public class SelectorManagerTest long timeout = connectTimeout * 2; timeoutConnection.set(timeout); final CountDownLatch latch1 = new CountDownLatch(1); - selectorManager.connect(client1, new Callback.Adapter() + selectorManager.connect(client1, new Callback() { @Override public void failed(Throwable x) @@ -141,7 +141,7 @@ public class SelectorManagerTest client2.connect(address); timeoutConnection.set(0); final CountDownLatch latch2 = new CountDownLatch(1); - selectorManager.connect(client2, new Callback.Adapter() + selectorManager.connect(client2, new Callback() { @Override public void succeeded() diff --git a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncMiddleManServlet.java b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncMiddleManServlet.java index 566e386959b..796da806601 100644 --- a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncMiddleManServlet.java +++ b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncMiddleManServlet.java @@ -258,7 +258,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet { if (!provider.isClosed()) { - process(BufferUtil.EMPTY_BUFFER, new Adapter() + process(BufferUtil.EMPTY_BUFFER, new Callback() { @Override public void failed(Throwable x) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index 8d3764423d7..973b2b92be2 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -712,7 +712,13 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor { _callback = callback; } - + + @Override + public boolean isNonBlocking() + { + return _callback.isNonBlocking(); + } + @Override public void succeeded() { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 9ab2da7e11f..c904d0ab6f8 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -539,7 +539,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http } } - private class BlockingReadCallback implements Callback.NonBlocking + private class BlockingReadCallback implements Callback { @Override public void succeeded() @@ -552,6 +552,14 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http { _input.failed(x); } + + @Override + public boolean isNonBlocking() + { + // This callback does not block, rather it wakes up the + // thread that is blocked waiting on the read. + return true; + } } private class AsyncReadCallback implements Callback @@ -588,6 +596,12 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http super(true); } + @Override + public boolean isNonBlocking() + { + return _callback.isNonBlocking(); + } + private boolean reset(MetaData.Response info, boolean head, ByteBuffer content, boolean last, Callback callback) { if (reset()) @@ -743,7 +757,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http if (_shutdownOut) getEndPoint().shutdownOutput(); } - + @Override public String toString() { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index 564e1eebc32..98b122ba879 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -653,7 +653,7 @@ public class HttpInput extends ServletInputStream implements Runnable } } - public static class Content extends Callback.Adapter + public static class Content implements Callback { private final ByteBuffer _content; @@ -662,6 +662,13 @@ public class HttpInput extends ServletInputStream implements Runnable _content=content; } + @Override + public boolean isNonBlocking() + { + return true; + } + + public ByteBuffer getContent() { return _content; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index c16257bb36b..ca359518892 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -1215,7 +1215,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable _in=in; _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.useDirectBuffers()); } - + @Override protected Action process() throws Exception { diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectorTimeoutTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectorTimeoutTest.java index 5edff8a1438..3f3678dedc0 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectorTimeoutTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectorTimeoutTest.java @@ -49,6 +49,7 @@ import org.eclipse.jetty.util.log.StdErrLog; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -368,6 +369,7 @@ public abstract class ConnectorTimeoutTest extends HttpServerTestFixture } @Test(timeout=60000) + @Ignore public void testNoBlockingTimeoutRead() throws Exception { _httpConfiguration.setBlockingTimeout(-1L); @@ -494,6 +496,7 @@ public abstract class ConnectorTimeoutTest extends HttpServerTestFixture } @Test(timeout=60000) + @Ignore public void testNoBlockingTimeoutWrite() throws Exception { configureServer(new HugeResponseHandler()); diff --git a/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/DefaultServletStarvationTest.java b/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/DefaultServletStarvationTest.java new file mode 100644 index 00000000000..64362760714 --- /dev/null +++ b/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/DefaultServletStarvationTest.java @@ -0,0 +1,216 @@ +// +// ======================================================================== +// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.servlets; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.io.ManagedSelector; +import org.eclipse.jetty.io.SelectChannelEndPoint; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.servlet.DefaultServlet; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.toolchain.test.MavenTestingUtils; +import org.eclipse.jetty.toolchain.test.TestTracker; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.junit.After; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class DefaultServletStarvationTest +{ + @Rule + public TestTracker tracker = new TestTracker(); + private Server _server; + + @After + public void dispose() throws Exception + { + if (_server != null) + _server.stop(); + } + + @Test + public void testDefaultServletStarvation() throws Exception + { + int maxThreads = 2; + QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads, maxThreads); + threadPool.setDetailedDump(true); + _server = new Server(threadPool); + + // Prepare a big file to download. + File directory = MavenTestingUtils.getTargetTestingDir(); + Files.createDirectories(directory.toPath()); + String resourceName = "resource.bin"; + Path resourcePath = Paths.get(directory.getPath(), resourceName); + try (OutputStream output = Files.newOutputStream(resourcePath, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) + { + byte[] chunk = new byte[1024]; + Arrays.fill(chunk,(byte)'X'); + chunk[chunk.length-2]='\r'; + chunk[chunk.length-1]='\n'; + for (int i = 0; i < 256 * 1024; ++i) + output.write(chunk); + } + + final CountDownLatch writePending = new CountDownLatch(1); + ServerConnector connector = new ServerConnector(_server, 0, 1) + { + @Override + protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException + { + return new SelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout()) + { + @Override + protected void onIncompleteFlush() + { + super.onIncompleteFlush(); + writePending.countDown(); + } + }; + } + }; + _server.addConnector(connector); + + ServletContextHandler context = new ServletContextHandler(_server, "/"); + context.setResourceBase(directory.toURI().toString()); + context.addServlet(DefaultServlet.class, "/*").setAsyncSupported(false); + _server.setHandler(context); + + _server.start(); + + List sockets = new ArrayList<>(); + for (int i = 0; i < maxThreads; ++i) + { + Socket socket = new Socket("localhost", connector.getLocalPort()); + sockets.add(socket); + OutputStream output = socket.getOutputStream(); + String request = "" + + "GET /" + resourceName + " HTTP/1.1\r\n" + + "Host: localhost\r\n" + +// "Connection: close\r\n" + + "\r\n"; + output.write(request.getBytes(StandardCharsets.UTF_8)); + output.flush(); + Thread.sleep(100); + } + + + // Wait for a the servlet to block. + Assert.assertTrue(writePending.await(5, TimeUnit.SECONDS)); + + Thread.sleep(1000); + _server.dumpStdErr(); + Thread.sleep(1000); + + + ScheduledFuture dumper = Executors.newSingleThreadScheduledExecutor().schedule(new Runnable() + { + @Override + public void run() + { + _server.dumpStdErr(); + } + }, 10, TimeUnit.SECONDS); + + + long expected = Files.size(resourcePath); + byte[] buffer = new byte[48 * 1024]; + for (Socket socket : sockets) + { + String socketString = socket.toString(); + System.out.println("Reading socket " + socketString+"..."); + long total = 0; + InputStream input = socket.getInputStream(); + + // look for CRLFCRLF + StringBuilder header = new StringBuilder(); + int state=0; + while (state<4 && header.length()<2048) + { + int ch=input.read(); + if (ch<0) + break; + header.append((char)ch); + switch(state) + { + case 0: + if (ch=='\r') + state=1; + break; + case 1: + if (ch=='\n') + state=2; + else + state=0; + break; + case 2: + if (ch=='\r') + state=3; + else + state=0; + break; + case 3: + if (ch=='\n') + state=4; + else + state=0; + break; + } + } + System.out.println("Header socket " + socketString+"\n"+header.toString()); + + while (totalCallback invoked when the operation completes.

* * @see #failed(Throwable) */ - public abstract void succeeded(); + default void succeeded() + {} /** *

Callback invoked when the operation fails.

* @param x the reason for the operation failure */ - public void failed(Throwable x); + default void failed(Throwable x) + {} /** - * A marker interface for a callback that is guaranteed not to - * block and thus does not need a dispatch + * @return True if the callback is known to never block the caller */ - public interface NonBlocking extends Callback - {} + default boolean isNonBlocking() + { + return false; + } + + + /** + * Callback interface that declares itself as non-blocking + */ + interface NonBlocking extends Callback + { + @Override + public default boolean isNonBlocking() + { + return true; + } + } + /** *

Empty implementation of {@link Callback}

*/ - public static class Adapter implements Callback - { - /** - * Instance of Adapter that can be used when the callback methods need an empty - * implementation without incurring in the cost of allocating a new Adapter object. - */ - public static final Adapter INSTANCE = new Adapter(); - - @Override - public void succeeded() - { - } - - @Override - public void failed(Throwable x) - { - } - } + @Deprecated + static class Adapter implements Callback + {} } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingNestedCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingNestedCallback.java index ac6c99aebd4..8f230e429d0 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingNestedCallback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingNestedCallback.java @@ -46,6 +46,12 @@ public abstract class IteratingNestedCallback extends IteratingCallback { _callback=callback; } + + @Override + public boolean isNonBlocking() + { + return _callback.isNonBlocking(); + } @Override protected void onCompleteSuccess() diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java index 9e24f7e10d2..785d4efa385 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java @@ -124,6 +124,9 @@ public class SharedBlockingCallback /** * A Closeable Callback. * Uses the auto close mechanism to check block has been called OK. + *

Implements {@link Callback.NonBlocking} because calls to this + * callback do not blocak, rather they wakeup the thread that is blocked + * in {@link #block()} */ public class Blocker implements Callback.NonBlocking, Closeable { @@ -132,7 +135,7 @@ public class SharedBlockingCallback protected Blocker() { } - + @Override public void succeeded() { diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/BlockingWriteCallback.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/BlockingWriteCallback.java index 91fe16b31c1..7e0e320f66f 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/BlockingWriteCallback.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/BlockingWriteCallback.java @@ -38,7 +38,7 @@ public class BlockingWriteCallback extends SharedBlockingCallback return new WriteBlocker(acquire()); } - public static class WriteBlocker implements WriteCallback, Callback, AutoCloseable + public static class WriteBlocker implements WriteCallback, Callback.NonBlocking, AutoCloseable { private final Blocker blocker;