From 9897c1b06e83bdbe37ba21d7f469f0ea53ff8f04 Mon Sep 17 00:00:00 2001
From: Simone Bordet
Date: Tue, 24 Aug 2021 01:10:56 +0200
Subject: [PATCH] Fixes #6646 - SmallThreadPoolLoadTest on windows flaky.
Fixed occurrences of Callbacks that did not override getInvocationType() to properly declare whether they block or not.
Added test case for blocking writes for both client and server.
Signed-off-by: Simone Bordet
---
.../jetty/embedded/FastFileServer.java | 6 +
.../client/HTTP2ClientConnectionFactory.java | 6 +
.../org/eclipse/jetty/http2/HTTP2Flusher.java | 12 +
.../org/eclipse/jetty/http2/HTTP2Session.java | 5 +-
.../org/eclipse/jetty/http2/HTTP2Stream.java | 9 +
.../BlockedWritesWithSmallThreadPoolTest.java | 305 ++++++++++++++++++
.../org/eclipse/jetty/server/HttpChannel.java | 6 +-
.../org/eclipse/jetty/server/HttpOutput.java | 32 +-
.../eclipse/jetty/server/ResourceService.java | 6 +
.../java/org/eclipse/jetty/util/Callback.java | 27 +-
10 files changed, 395 insertions(+), 19 deletions(-)
create mode 100644 jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/BlockedWritesWithSmallThreadPoolTest.java
diff --git a/examples/embedded/src/main/java/org/eclipse/jetty/embedded/FastFileServer.java b/examples/embedded/src/main/java/org/eclipse/jetty/embedded/FastFileServer.java
index d97b30e5f32..b010572d198 100644
--- a/examples/embedded/src/main/java/org/eclipse/jetty/embedded/FastFileServer.java
+++ b/examples/embedded/src/main/java/org/eclipse/jetty/embedded/FastFileServer.java
@@ -183,6 +183,12 @@ public class FastFileServer
x.printStackTrace();
async.complete();
}
+
+ @Override
+ public InvocationType getInvocationType()
+ {
+ return InvocationType.NON_BLOCKING;
+ }
};
// send "medium" files from an input stream
diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java
index 6b8e5f920a9..6a2316062cc 100644
--- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java
+++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java
@@ -135,6 +135,12 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
close();
promise.failed(x);
}
+
+ @Override
+ public InvocationType getInvocationType()
+ {
+ return InvocationType.NON_BLOCKING;
+ }
}
private static class ConnectionListener implements Connection.Listener
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java
index 87d29e0d391..fca7af2c532 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java
@@ -41,6 +41,7 @@ import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.util.thread.Invocable;
public class HTTP2Flusher extends IteratingCallback implements Dumpable
{
@@ -53,6 +54,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
private final Collection processedEntries = new ArrayList<>();
private final HTTP2Session session;
private final ByteBufferPool.Lease lease;
+ private InvocationType invocationType = InvocationType.NON_BLOCKING;
private Throwable terminated;
private Entry stalledEntry;
@@ -62,6 +64,12 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
this.lease = new ByteBufferPool.Lease(session.getGenerator().getByteBufferPool());
}
+ @Override
+ public InvocationType getInvocationType()
+ {
+ return invocationType;
+ }
+
public void window(IStream stream, WindowUpdateFrame frame)
{
Throwable closed;
@@ -217,7 +225,10 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
// We use ArrayList contains() + add() instead of HashSet add()
// because that is faster for collections of size up to 250 entries.
if (!processedEntries.contains(entry))
+ {
processedEntries.add(entry);
+ invocationType = Invocable.combine(invocationType, Invocable.getInvocationType(entry.getCallback()));
+ }
if (entry.getDataBytesRemaining() == 0)
pending.remove();
@@ -314,6 +325,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
processedEntries.forEach(Entry::succeeded);
processedEntries.clear();
+ invocationType = InvocationType.NON_BLOCKING;
if (stalledEntry != null)
{
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java
index 793982a3c38..3a0c7b4feea 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java
@@ -72,6 +72,7 @@ import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.DumpableCollection;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Scheduler;
@@ -1900,7 +1901,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private void sendGoAwayAndTerminate(GoAwayFrame frame, GoAwayFrame eventFrame)
{
- sendGoAway(frame, Callback.from(() -> terminate(eventFrame)));
+ sendGoAway(frame, Callback.from(Callback.NOOP, () -> terminate(eventFrame)));
}
private void sendGoAway(GoAwayFrame frame, Callback callback)
@@ -2077,7 +2078,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
stream.setListener(listener);
stream.process(new PrefaceFrame(), Callback.NOOP);
- Callback streamCallback = Callback.from(() -> promise.succeeded(stream), x ->
+ Callback streamCallback = Callback.from(Invocable.InvocationType.NON_BLOCKING, () -> promise.succeeded(stream), x ->
{
HTTP2Session.this.onStreamDestroyed(streamId);
promise.failed(x);
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java
index efa05f464d8..9e47e309ae6 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java
@@ -600,6 +600,15 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
callback.failed(x);
}
+ @Override
+ public InvocationType getInvocationType()
+ {
+ synchronized (this)
+ {
+ return sendCallback != null ? sendCallback.getInvocationType() : Callback.super.getInvocationType();
+ }
+ }
+
private Callback endWrite()
{
synchronized (this)
diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/BlockedWritesWithSmallThreadPoolTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/BlockedWritesWithSmallThreadPoolTest.java
new file mode 100644
index 00000000000..f0ac39cf6a4
--- /dev/null
+++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/BlockedWritesWithSmallThreadPoolTest.java
@@ -0,0 +1,305 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.http2.client.http;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.eclipse.jetty.http.HttpFields;
+import org.eclipse.jetty.http.HttpStatus;
+import org.eclipse.jetty.http.HttpURI;
+import org.eclipse.jetty.http.HttpVersion;
+import org.eclipse.jetty.http.MetaData;
+import org.eclipse.jetty.http2.HTTP2Session;
+import org.eclipse.jetty.http2.api.Session;
+import org.eclipse.jetty.http2.api.Stream;
+import org.eclipse.jetty.http2.api.server.ServerSessionListener;
+import org.eclipse.jetty.http2.client.HTTP2Client;
+import org.eclipse.jetty.http2.frames.DataFrame;
+import org.eclipse.jetty.http2.frames.HeadersFrame;
+import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
+import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
+import org.eclipse.jetty.io.AbstractEndPoint;
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.HttpConfiguration;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.FuturePromise;
+import org.eclipse.jetty.util.Promise;
+import org.eclipse.jetty.util.component.LifeCycle;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class BlockedWritesWithSmallThreadPoolTest
+{
+ private Server server;
+ private ServerConnector connector;
+ private QueuedThreadPool serverThreads;
+ private HTTP2Client client;
+
+ private void start(Handler handler) throws Exception
+ {
+ // Threads: 1 acceptor, 1 selector, 1 reserved, 1 application.
+ serverThreads = newSmallThreadPool("server", 4);
+ server = new Server(serverThreads);
+ HTTP2CServerConnectionFactory http2 = new HTTP2CServerConnectionFactory(new HttpConfiguration());
+ connector = new ServerConnector(server, 1, 1, http2);
+ server.addConnector(connector);
+ server.setHandler(handler);
+ server.start();
+ }
+
+ private void start(RawHTTP2ServerConnectionFactory factory) throws Exception
+ {
+ // Threads: 1 acceptor, 1 selector, 1 reserved, 1 application.
+ serverThreads = newSmallThreadPool("server", 4);
+ server = new Server(serverThreads);
+ connector = new ServerConnector(server, 1, 1, factory);
+ server.addConnector(connector);
+ server.start();
+ }
+
+ private QueuedThreadPool newSmallThreadPool(String name, int maxThreads)
+ {
+ QueuedThreadPool pool = new QueuedThreadPool(maxThreads, maxThreads);
+ pool.setName(name);
+ pool.setReservedThreads(1);
+ pool.setDetailedDump(true);
+ return pool;
+ }
+
+ @AfterEach
+ public void dispose()
+ {
+ LifeCycle.stop(client);
+ LifeCycle.stop(server);
+ }
+
+ @Test
+ public void testServerThreadsBlockedInWrites() throws Exception
+ {
+ int contentLength = 16 * 1024 * 1024;
+ AtomicReference serverEndPointRef = new AtomicReference<>();
+ start(new EmptyServerHandler()
+ {
+ @Override
+ protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
+ {
+ serverEndPointRef.compareAndSet(null, (AbstractEndPoint)jettyRequest.getHttpChannel().getEndPoint());
+ // Write a large content to cause TCP congestion.
+ response.getOutputStream().write(new byte[contentLength]);
+ }
+ });
+
+ client = new HTTP2Client();
+ // Set large flow control windows so the server hits TCP congestion.
+ int window = 2 * contentLength;
+ client.setInitialSessionRecvWindow(window);
+ client.setInitialStreamRecvWindow(window);
+ client.start();
+
+ FuturePromise promise = new FuturePromise<>();
+ client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener.Adapter(), promise);
+ Session session = promise.get(5, TimeUnit.SECONDS);
+
+ CountDownLatch clientBlockLatch = new CountDownLatch(1);
+ CountDownLatch clientDataLatch = new CountDownLatch(1);
+ // Send a request to TCP congest the server.
+ HttpURI uri = new HttpURI("http://localhost:" + connector.getLocalPort() + "/congest");
+ MetaData.Request request = new MetaData.Request("GET", uri, HttpVersion.HTTP_2, new HttpFields());
+ session.newStream(new HeadersFrame(request, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter()
+ {
+ @Override
+ public void onData(Stream stream, DataFrame frame, Callback callback)
+ {
+ // Block here to stop reading from the network
+ // to cause the server to TCP congest.
+ awaitUntil(0, () -> clientBlockLatch.await(5, TimeUnit.SECONDS));
+ callback.succeeded();
+ if (frame.isEndStream())
+ clientDataLatch.countDown();
+ }
+ });
+
+ awaitUntil(5000, () ->
+ {
+ AbstractEndPoint serverEndPoint = serverEndPointRef.get();
+ return serverEndPoint != null && serverEndPoint.getWriteFlusher().isPending();
+ });
+ // Wait for NIO on the server to be OP_WRITE interested.
+ Thread.sleep(1000);
+
+ // Make sure there is a reserved thread.
+ if (serverThreads.getAvailableReservedThreads() != 1)
+ {
+ assertFalse(serverThreads.tryExecute(() -> {}));
+ awaitUntil(5000, () -> serverThreads.getAvailableReservedThreads() == 1);
+ }
+ // Use the reserved thread for a blocking operation, simulating another blocking write.
+ CountDownLatch serverBlockLatch = new CountDownLatch(1);
+ assertTrue(serverThreads.tryExecute(() -> awaitUntil(0, () -> serverBlockLatch.await(15, TimeUnit.SECONDS))));
+
+ assertEquals(0, serverThreads.getReadyThreads());
+
+ // Unblock the client to read from the network, which should unblock the server write().
+ clientBlockLatch.countDown();
+
+ assertTrue(clientDataLatch.await(10, TimeUnit.SECONDS), server.dump());
+ serverBlockLatch.countDown();
+ }
+
+ @Test
+ public void testClientThreadsBlockedInWrite() throws Exception
+ {
+ int contentLength = 16 * 1024 * 1024;
+ CountDownLatch serverBlockLatch = new CountDownLatch(1);
+ RawHTTP2ServerConnectionFactory http2 = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), new ServerSessionListener.Adapter()
+ {
+ @Override
+ public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
+ {
+ return new Stream.Listener.Adapter()
+ {
+ @Override
+ public void onData(Stream stream, DataFrame frame, Callback callback)
+ {
+ // Block here to stop reading from the network
+ // to cause the client to TCP congest.
+ awaitUntil(0, () -> serverBlockLatch.await(5, TimeUnit.SECONDS));
+ callback.succeeded();
+ if (frame.isEndStream())
+ {
+ MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
+ stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
+ }
+ }
+ };
+ }
+ });
+ int window = 2 * contentLength;
+ http2.setInitialSessionRecvWindow(window);
+ http2.setInitialStreamRecvWindow(window);
+ start(http2);
+
+ client = new HTTP2Client();
+ // Threads: 1 selector, 1 reserved, 1 application.
+ QueuedThreadPool clientThreads = newSmallThreadPool("client", 3);
+ client.setExecutor(clientThreads);
+ client.setSelectors(1);
+ client.start();
+
+ FuturePromise promise = new FuturePromise<>();
+ client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener.Adapter(), promise);
+ Session session = promise.get(5, TimeUnit.SECONDS);
+
+ // Send a request to TCP congest the client.
+ HttpURI uri = new HttpURI("http://localhost:" + connector.getLocalPort() + "/congest");
+ MetaData.Request request = new MetaData.Request("GET", uri, HttpVersion.HTTP_2, new HttpFields());
+ FuturePromise streamPromise = new FuturePromise<>();
+ CountDownLatch latch = new CountDownLatch(1);
+ session.newStream(new HeadersFrame(request, null, false), streamPromise, new Stream.Listener.Adapter()
+ {
+ @Override
+ public void onHeaders(Stream stream, HeadersFrame frame)
+ {
+ MetaData.Response response = (MetaData.Response)frame.getMetaData();
+ assertEquals(HttpStatus.OK_200, response.getStatus());
+ latch.countDown();
+ }
+ });
+ Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
+ stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(contentLength), true), Callback.NOOP);
+
+ awaitUntil(5000, () ->
+ {
+ AbstractEndPoint clientEndPoint = (AbstractEndPoint)((HTTP2Session)session).getEndPoint();
+ return clientEndPoint.getWriteFlusher().isPending();
+ });
+ // Wait for NIO on the client to be OP_WRITE interested.
+ Thread.sleep(1000);
+
+ CountDownLatch clientBlockLatch = new CountDownLatch(1);
+ // Make sure the application threads are blocked.
+ IntStream.range(0, clientThreads.getIdleThreads())
+ .forEach(i -> clientThreads.execute(() -> awaitUntil(0, () -> clientBlockLatch.await(15, TimeUnit.SECONDS))));
+ // Make sure the reserved threads are blocked.
+ if (clientThreads.getAvailableReservedThreads() != 1)
+ {
+ assertFalse(clientThreads.tryExecute(() -> {}));
+ awaitUntil(5000, () -> clientThreads.getAvailableReservedThreads() == 1);
+ }
+ // Use the reserved thread for a blocking operation, simulating another blocking write.
+ assertTrue(clientThreads.tryExecute(() -> awaitUntil(0, () -> clientBlockLatch.await(15, TimeUnit.SECONDS))));
+
+ awaitUntil(5000, () -> clientThreads.getReadyThreads() == 0);
+
+ // Unblock the server to read from the network, which should unblock the client.
+ serverBlockLatch.countDown();
+
+ assertTrue(latch.await(10, TimeUnit.SECONDS), client.dump());
+ clientBlockLatch.countDown();
+ }
+
+ private void awaitUntil(long millis, Callable test)
+ {
+ try
+ {
+ if (millis == 0)
+ {
+ if (test.call())
+ return;
+ }
+ else
+ {
+ long begin = System.nanoTime();
+ while (System.nanoTime() - begin < TimeUnit.MILLISECONDS.toNanos(millis))
+ {
+ if (test.call())
+ return;
+ Thread.sleep(10);
+ }
+ }
+ fail("Await elapsed: " + millis + "ms");
+ }
+ catch (RuntimeException | Error x)
+ {
+ throw x;
+ }
+ catch (Exception x)
+ {
+ throw new RuntimeException(x);
+ }
+ }
+}
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java
index 314ee5274af..c46236a7a24 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java
@@ -63,6 +63,8 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
+import static org.eclipse.jetty.util.thread.Invocable.InvocationType.NON_BLOCKING;
+
/**
* HttpChannel represents a single endpoint for HTTP semantic processing.
* The HttpChannel is both an HttpParser.RequestHandler, where it passively receives events from
@@ -152,7 +154,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
* {@link TransientListeners} as an {@link AbstractConnector}
* provided listener
* Transient listeners are removed after every request cycle
- * @param listener
+ * @param listener the listener to add
* @return true if the listener was added.
*/
@Deprecated
@@ -535,7 +537,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
}
// Set a close callback on the HttpOutput to make it an async callback
- _response.completeOutput(Callback.from(() -> _state.completed(null), _state::completed));
+ _response.completeOutput(Callback.from(NON_BLOCKING, () -> _state.completed(null), _state::completed));
break;
}
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java
index 619c877ee39..2b67359ddb4 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java
@@ -741,7 +741,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
}
if (content == null)
+ {
new AsyncFlush(false).iterate();
+ }
else
{
try
@@ -1579,7 +1581,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{
final boolean _last;
- ChannelWriteCB(boolean last)
+ private ChannelWriteCB(boolean last)
{
_last = last;
}
@@ -1605,14 +1607,20 @@ public class HttpOutput extends ServletOutputStream implements Runnable
private abstract class NestedChannelWriteCB extends ChannelWriteCB
{
- final Callback _callback;
+ private final Callback _callback;
- NestedChannelWriteCB(Callback callback, boolean last)
+ private NestedChannelWriteCB(Callback callback, boolean last)
{
super(last);
_callback = callback;
}
+ @Override
+ public InvocationType getInvocationType()
+ {
+ return _callback.getInvocationType();
+ }
+
@Override
protected void onCompleteSuccess()
{
@@ -1647,9 +1655,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
private class AsyncFlush extends ChannelWriteCB
{
- volatile boolean _flushed;
+ private volatile boolean _flushed;
- AsyncFlush(boolean last)
+ private AsyncFlush(boolean last)
{
super(last);
}
@@ -1682,7 +1690,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
private final int _len;
private boolean _completed;
- AsyncWrite(byte[] b, int off, int len, boolean last)
+ private AsyncWrite(byte[] b, int off, int len, boolean last)
{
super(last);
_buffer = ByteBuffer.wrap(b, off, len);
@@ -1691,7 +1699,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_slice = _len < getBufferSize() ? null : _buffer.duplicate();
}
- AsyncWrite(ByteBuffer buffer, boolean last)
+ private AsyncWrite(ByteBuffer buffer, boolean last)
{
super(last);
_buffer = buffer;
@@ -1779,7 +1787,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
private boolean _eof;
private boolean _closed;
- InputStreamWritingCB(InputStream in, Callback callback)
+ private InputStreamWritingCB(InputStream in, Callback callback)
{
super(callback, true);
_in = in;
@@ -1854,7 +1862,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
private boolean _eof;
private boolean _closed;
- ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback)
+ private ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback)
{
super(callback, true);
_in = in;
@@ -1935,5 +1943,11 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{
onWriteComplete(true, x);
}
+
+ @Override
+ public InvocationType getInvocationType()
+ {
+ return InvocationType.NON_BLOCKING;
+ }
}
}
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ResourceService.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ResourceService.java
index c908e8d271c..9a0a2f7bfbe 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/ResourceService.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ResourceService.java
@@ -722,6 +722,12 @@ public class ResourceService
content.release();
}
+ @Override
+ public InvocationType getInvocationType()
+ {
+ return InvocationType.NON_BLOCKING;
+ }
+
@Override
public String toString()
{
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java
index dc84982cc47..fd4f4478a98 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java
@@ -113,13 +113,26 @@ public interface Callback extends Invocable
}
/**
- * Create a callback from the passed success and failure
+ * Creates a callback from the given success and failure lambdas.
*
* @param success Called when the callback succeeds
* @param failure Called when the callback fails
* @return a new Callback
*/
static Callback from(Runnable success, Consumer failure)
+ {
+ return from(InvocationType.BLOCKING, success, failure);
+ }
+
+ /**
+ * Creates a callback with the given InvocationType from the given success and failure lambdas.
+ *
+ * @param invocationType the Callback invocation type
+ * @param success Called when the callback succeeds
+ * @param failure Called when the callback fails
+ * @return a new Callback
+ */
+ static Callback from(InvocationType invocationType, Runnable success, Consumer failure)
{
return new Callback()
{
@@ -134,6 +147,12 @@ public interface Callback extends Invocable
{
failure.accept(x);
}
+
+ @Override
+ public InvocationType getInvocationType()
+ {
+ return invocationType;
+ }
};
}
@@ -291,10 +310,6 @@ public interface Callback extends Invocable
}
}
- interface InvocableCallback extends Invocable, Callback
- {
- }
-
static Callback combine(Callback cb1, Callback cb2)
{
if (cb1 == null || cb1 == cb2)
@@ -302,7 +317,7 @@ public interface Callback extends Invocable
if (cb2 == null)
return cb1;
- return new InvocableCallback()
+ return new Callback()
{
@Override
public void succeeded()