From 2689cf75d1e30f1b1e3ae4a1d6c491d0feeac6c4 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Thu, 26 Aug 2021 01:28:33 +0200 Subject: [PATCH] Fixes #6646 - Propagate callback invocation type to avoid deadlock in SmallThreadPoolLoadTest Fixed occurrences of Callbacks that did not override getInvocationType() to properly declare whether they block or not. Added test case for blocking writes for both client and server. Signed-off-by: Simone Bordet (cherry picked from commit 9897c1b06e83bdbe37ba21d7f469f0ea53ff8f04) --- .../eclipse/jetty/demos/FastFileServer.java | 6 + .../client/HTTP2ClientConnectionFactory.java | 6 + .../org/eclipse/jetty/http2/HTTP2Flusher.java | 12 + .../org/eclipse/jetty/http2/HTTP2Session.java | 5 +- .../org/eclipse/jetty/http2/HTTP2Stream.java | 9 + .../http2-http-client-transport/pom.xml | 5 + .../BlockedWritesWithSmallThreadPoolTest.java | 281 ++++++++++++++++++ .../org/eclipse/jetty/server/HttpChannel.java | 4 +- .../org/eclipse/jetty/server/HttpOutput.java | 32 +- .../eclipse/jetty/server/ResourceService.java | 6 + .../java/org/eclipse/jetty/util/Callback.java | 27 +- 11 files changed, 375 insertions(+), 18 deletions(-) create mode 100644 jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/BlockedWritesWithSmallThreadPoolTest.java diff --git a/demos/embedded/src/main/java/org/eclipse/jetty/demos/FastFileServer.java b/demos/embedded/src/main/java/org/eclipse/jetty/demos/FastFileServer.java index 2bd217d118a..8cd69fa09ba 100644 --- a/demos/embedded/src/main/java/org/eclipse/jetty/demos/FastFileServer.java +++ b/demos/embedded/src/main/java/org/eclipse/jetty/demos/FastFileServer.java @@ -174,6 +174,12 @@ public class FastFileServer x.printStackTrace(); async.complete(); } + + @Override + public InvocationType getInvocationType() + { + return InvocationType.NON_BLOCKING; + } }; // send "medium" files from an input stream diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java index 2a6be2b23be..7d198e7f2d1 100644 --- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java +++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java @@ -135,6 +135,12 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory close(); promise.failed(x); } + + @Override + public InvocationType getInvocationType() + { + return InvocationType.NON_BLOCKING; + } } private static class ConnectionListener implements Connection.Listener diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java index aedaa2e0bef..62dd5caca42 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java @@ -35,6 +35,7 @@ import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.thread.AutoLock; +import org.eclipse.jetty.util.thread.Invocable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +51,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable private final Collection processedEntries = new ArrayList<>(); private final HTTP2Session session; private final ByteBufferPool.Lease lease; + private InvocationType invocationType = InvocationType.NON_BLOCKING; private Throwable terminated; private Entry stalledEntry; @@ -59,6 +61,12 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable this.lease = new ByteBufferPool.Lease(session.getGenerator().getByteBufferPool()); } + @Override + public InvocationType getInvocationType() + { + return invocationType; + } + public void window(IStream stream, WindowUpdateFrame frame) { Throwable closed; @@ -214,7 +222,10 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable // We use ArrayList contains() + add() instead of HashSet add() // because that is faster for collections of size up to 250 entries. if (!processedEntries.contains(entry)) + { processedEntries.add(entry); + invocationType = Invocable.combine(invocationType, Invocable.getInvocationType(entry.getCallback())); + } if (entry.getDataBytesRemaining() == 0) pending.remove(); @@ -311,6 +322,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable processedEntries.forEach(Entry::succeeded); processedEntries.clear(); + invocationType = InvocationType.NON_BLOCKING; if (stalledEntry != null) { diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 802718af54b..3f05639f6a4 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -72,6 +72,7 @@ import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.DumpableCollection; import org.eclipse.jetty.util.thread.AutoLock; +import org.eclipse.jetty.util.thread.Invocable; import org.eclipse.jetty.util.thread.Scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1991,7 +1992,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio private void sendGoAwayAndTerminate(GoAwayFrame frame, GoAwayFrame eventFrame) { - sendGoAway(frame, Callback.from(() -> terminate(eventFrame))); + sendGoAway(frame, Callback.from(Callback.NOOP, () -> terminate(eventFrame))); } private void sendGoAway(GoAwayFrame frame, Callback callback) @@ -2197,7 +2198,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio stream.setListener(listener); stream.process(new PrefaceFrame(), Callback.NOOP); - Callback streamCallback = Callback.from(() -> promise.succeeded(stream), x -> + Callback streamCallback = Callback.from(Invocable.InvocationType.NON_BLOCKING, () -> promise.succeeded(stream), x -> { HTTP2Session.this.onStreamDestroyed(streamId); promise.failed(x); diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index 11f1c1056e9..a7783f57f7b 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -742,6 +742,15 @@ public class HTTP2Stream implements IStream, Callback, Dumpable, CyclicTimeouts. callback.failed(x); } + @Override + public InvocationType getInvocationType() + { + synchronized (this) + { + return sendCallback != null ? sendCallback.getInvocationType() : Callback.super.getInvocationType(); + } + } + private Callback endWrite() { try (AutoLock l = lock.lock()) diff --git a/jetty-http2/http2-http-client-transport/pom.xml b/jetty-http2/http2-http-client-transport/pom.xml index 0048ecb9d03..2aafe38d95b 100644 --- a/jetty-http2/http2-http-client-transport/pom.xml +++ b/jetty-http2/http2-http-client-transport/pom.xml @@ -63,6 +63,11 @@ ${project.version} test + + org.awaitility + awaitility + test + diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/BlockedWritesWithSmallThreadPoolTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/BlockedWritesWithSmallThreadPoolTest.java new file mode 100644 index 00000000000..fc9e20d8e6c --- /dev/null +++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/BlockedWritesWithSmallThreadPoolTest.java @@ -0,0 +1,281 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.http2.client.http; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.IntStream; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.HttpURI; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.HTTP2Session; +import org.eclipse.jetty.http2.api.Session; +import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.api.server.ServerSessionListener; +import org.eclipse.jetty.http2.client.HTTP2Client; +import org.eclipse.jetty.http2.frames.DataFrame; +import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory; +import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory; +import org.eclipse.jetty.io.AbstractEndPoint; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.FuturePromise; +import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.component.LifeCycle; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class BlockedWritesWithSmallThreadPoolTest +{ + private Server server; + private ServerConnector connector; + private QueuedThreadPool serverThreads; + private HTTP2Client client; + + private void start(Handler handler) throws Exception + { + // Threads: 1 acceptor, 1 selector, 1 reserved, 1 application. + serverThreads = newSmallThreadPool("server", 4); + server = new Server(serverThreads); + HTTP2CServerConnectionFactory http2 = new HTTP2CServerConnectionFactory(new HttpConfiguration()); + connector = new ServerConnector(server, 1, 1, http2); + server.addConnector(connector); + server.setHandler(handler); + server.start(); + } + + private void start(RawHTTP2ServerConnectionFactory factory) throws Exception + { + // Threads: 1 acceptor, 1 selector, 1 reserved, 1 application. + serverThreads = newSmallThreadPool("server", 4); + server = new Server(serverThreads); + connector = new ServerConnector(server, 1, 1, factory); + server.addConnector(connector); + server.start(); + } + + private QueuedThreadPool newSmallThreadPool(String name, int maxThreads) + { + QueuedThreadPool pool = new QueuedThreadPool(maxThreads, maxThreads); + pool.setName(name); + pool.setReservedThreads(1); + pool.setDetailedDump(true); + return pool; + } + + @AfterEach + public void dispose() + { + LifeCycle.stop(client); + LifeCycle.stop(server); + } + + @Test + public void testServerThreadsBlockedInWrites() throws Exception + { + int contentLength = 16 * 1024 * 1024; + AtomicReference serverEndPointRef = new AtomicReference<>(); + start(new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + serverEndPointRef.compareAndSet(null, (AbstractEndPoint)jettyRequest.getHttpChannel().getEndPoint()); + // Write a large content to cause TCP congestion. + response.getOutputStream().write(new byte[contentLength]); + } + }); + + client = new HTTP2Client(); + // Set large flow control windows so the server hits TCP congestion. + int window = 2 * contentLength; + client.setInitialSessionRecvWindow(window); + client.setInitialStreamRecvWindow(window); + client.start(); + + FuturePromise promise = new FuturePromise<>(); + client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener.Adapter(), promise); + Session session = promise.get(5, SECONDS); + + CountDownLatch clientBlockLatch = new CountDownLatch(1); + CountDownLatch clientDataLatch = new CountDownLatch(1); + // Send a request to TCP congest the server. + HttpURI uri = HttpURI.build("http://localhost:" + connector.getLocalPort() + "/congest"); + MetaData.Request request = new MetaData.Request("GET", uri, HttpVersion.HTTP_2, HttpFields.EMPTY); + session.newStream(new HeadersFrame(request, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + try + { + // Block here to stop reading from the network + // to cause the server to TCP congest. + clientBlockLatch.await(5, SECONDS); + callback.succeeded(); + if (frame.isEndStream()) + clientDataLatch.countDown(); + } + catch (InterruptedException x) + { + callback.failed(x); + } + } + }); + + await().atMost(5, SECONDS).until(() -> + { + AbstractEndPoint serverEndPoint = serverEndPointRef.get(); + return serverEndPoint != null && serverEndPoint.getWriteFlusher().isPending(); + }); + // Wait for NIO on the server to be OP_WRITE interested. + Thread.sleep(1000); + + // Make sure there is a reserved thread. + if (serverThreads.getAvailableReservedThreads() != 1) + { + assertFalse(serverThreads.tryExecute(() -> {})); + await().atMost(5, SECONDS).until(() -> serverThreads.getAvailableReservedThreads() == 1); + } + // Use the reserved thread for a blocking operation, simulating another blocking write. + CountDownLatch serverBlockLatch = new CountDownLatch(1); + assertTrue(serverThreads.tryExecute(() -> await().atMost(20, SECONDS).until(() -> serverBlockLatch.await(15, SECONDS), b -> true))); + + assertEquals(0, serverThreads.getReadyThreads()); + + // Unblock the client to read from the network, which should unblock the server write(). + clientBlockLatch.countDown(); + + assertTrue(clientDataLatch.await(10, SECONDS), server.dump()); + serverBlockLatch.countDown(); + } + + @Test + public void testClientThreadsBlockedInWrite() throws Exception + { + int contentLength = 16 * 1024 * 1024; + CountDownLatch serverBlockLatch = new CountDownLatch(1); + RawHTTP2ServerConnectionFactory http2 = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + return new Stream.Listener.Adapter() + { + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + try + { + // Block here to stop reading from the network + // to cause the client to TCP congest. + serverBlockLatch.await(5, SECONDS); + callback.succeeded(); + if (frame.isEndStream()) + { + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + } + } + catch (InterruptedException x) + { + callback.failed(x); + } + } + }; + } + }); + int window = 2 * contentLength; + http2.setInitialSessionRecvWindow(window); + http2.setInitialStreamRecvWindow(window); + start(http2); + + client = new HTTP2Client(); + // Threads: 1 selector, 1 reserved, 1 application. + QueuedThreadPool clientThreads = newSmallThreadPool("client", 3); + client.setExecutor(clientThreads); + client.setSelectors(1); + client.start(); + + FuturePromise promise = new FuturePromise<>(); + client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener.Adapter(), promise); + Session session = promise.get(5, SECONDS); + + // Send a request to TCP congest the client. + HttpURI uri = HttpURI.build("http://localhost:" + connector.getLocalPort() + "/congest"); + MetaData.Request request = new MetaData.Request("GET", uri, HttpVersion.HTTP_2, HttpFields.EMPTY); + FuturePromise streamPromise = new FuturePromise<>(); + CountDownLatch latch = new CountDownLatch(1); + session.newStream(new HeadersFrame(request, null, false), streamPromise, new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + MetaData.Response response = (MetaData.Response)frame.getMetaData(); + assertEquals(HttpStatus.OK_200, response.getStatus()); + latch.countDown(); + } + }); + Stream stream = streamPromise.get(5, SECONDS); + stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(contentLength), true), Callback.NOOP); + + await().atMost(5, SECONDS).until(() -> + { + AbstractEndPoint clientEndPoint = (AbstractEndPoint)((HTTP2Session)session).getEndPoint(); + return clientEndPoint.getWriteFlusher().isPending(); + }); + // Wait for NIO on the client to be OP_WRITE interested. + Thread.sleep(1000); + + CountDownLatch clientBlockLatch = new CountDownLatch(1); + // Make sure the application thread is blocked. + clientThreads.execute(() -> await().until(() -> clientBlockLatch.await(15, SECONDS), b -> true)); + // Make sure the reserved thread is blocked. + if (clientThreads.getAvailableReservedThreads() != 1) + { + assertFalse(clientThreads.tryExecute(() -> {})); + await().atMost(5, SECONDS).until(() -> clientThreads.getAvailableReservedThreads() == 1); + } + // Use the reserved thread for a blocking operation, simulating another blocking write. + assertTrue(clientThreads.tryExecute(() -> await().until(() -> clientBlockLatch.await(15, SECONDS), b -> true))); + + await().atMost(5, SECONDS).until(() -> clientThreads.getReadyThreads() == 0); + + // Unblock the server to read from the network, which should unblock the client. + serverBlockLatch.countDown(); + + assertTrue(latch.await(10, SECONDS), client.dump()); + clientBlockLatch.countDown(); + } +} diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index 632929d8f46..f7d3ab5671c 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -56,6 +56,8 @@ import org.eclipse.jetty.util.thread.Scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.eclipse.jetty.util.thread.Invocable.InvocationType.NON_BLOCKING; + /** * HttpChannel represents a single endpoint for HTTP semantic processing. * The HttpChannel is both an HttpParser.RequestHandler, where it passively receives events from @@ -540,7 +542,7 @@ public abstract class HttpChannel implements Runnable, HttpOutput.Interceptor break; // Set a close callback on the HttpOutput to make it an async callback - _response.completeOutput(Callback.from(() -> _state.completed(null), _state::completed)); + _response.completeOutput(Callback.from(NON_BLOCKING, () -> _state.completed(null), _state::completed)); break; } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index f8df5ea27a2..42ed01b7b68 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -725,7 +725,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable } if (content == null) + { new AsyncFlush(false).iterate(); + } else { try @@ -1534,7 +1536,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable { final boolean _last; - ChannelWriteCB(boolean last) + private ChannelWriteCB(boolean last) { _last = last; } @@ -1560,14 +1562,20 @@ public class HttpOutput extends ServletOutputStream implements Runnable private abstract class NestedChannelWriteCB extends ChannelWriteCB { - final Callback _callback; + private final Callback _callback; - NestedChannelWriteCB(Callback callback, boolean last) + private NestedChannelWriteCB(Callback callback, boolean last) { super(last); _callback = callback; } + @Override + public InvocationType getInvocationType() + { + return _callback.getInvocationType(); + } + @Override protected void onCompleteSuccess() { @@ -1602,9 +1610,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable private class AsyncFlush extends ChannelWriteCB { - volatile boolean _flushed; + private volatile boolean _flushed; - AsyncFlush(boolean last) + private AsyncFlush(boolean last) { super(last); } @@ -1637,7 +1645,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable private final int _len; private boolean _completed; - AsyncWrite(byte[] b, int off, int len, boolean last) + private AsyncWrite(byte[] b, int off, int len, boolean last) { super(last); _buffer = ByteBuffer.wrap(b, off, len); @@ -1646,7 +1654,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable _slice = _len < getBufferSize() ? null : _buffer.duplicate(); } - AsyncWrite(ByteBuffer buffer, boolean last) + private AsyncWrite(ByteBuffer buffer, boolean last) { super(last); _buffer = buffer; @@ -1734,7 +1742,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable private boolean _eof; private boolean _closed; - InputStreamWritingCB(InputStream in, Callback callback) + private InputStreamWritingCB(InputStream in, Callback callback) { super(callback, true); _in = in; @@ -1810,7 +1818,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable private boolean _eof; private boolean _closed; - ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback) + private ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback) { super(callback, true); _in = in; @@ -1882,5 +1890,11 @@ public class HttpOutput extends ServletOutputStream implements Runnable { onWriteComplete(true, x); } + + @Override + public InvocationType getInvocationType() + { + return InvocationType.NON_BLOCKING; + } } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ResourceService.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ResourceService.java index 5172c42386b..afab62eed49 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ResourceService.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ResourceService.java @@ -717,6 +717,12 @@ public class ResourceService content.release(); } + @Override + public InvocationType getInvocationType() + { + return InvocationType.NON_BLOCKING; + } + @Override public String toString() { diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java index 634b1aa3ea9..d5c2fd60e3a 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java @@ -108,13 +108,26 @@ public interface Callback extends Invocable } /** - * Create a callback from the passed success and failure + * Creates a callback from the given success and failure lambdas. * * @param success Called when the callback succeeds * @param failure Called when the callback fails * @return a new Callback */ static Callback from(Runnable success, Consumer failure) + { + return from(InvocationType.BLOCKING, success, failure); + } + + /** + * Creates a callback with the given InvocationType from the given success and failure lambdas. + * + * @param invocationType the Callback invocation type + * @param success Called when the callback succeeds + * @param failure Called when the callback fails + * @return a new Callback + */ + static Callback from(InvocationType invocationType, Runnable success, Consumer failure) { return new Callback() { @@ -129,6 +142,12 @@ public interface Callback extends Invocable { failure.accept(x); } + + @Override + public InvocationType getInvocationType() + { + return invocationType; + } }; } @@ -339,10 +358,6 @@ public interface Callback extends Invocable } } - interface InvocableCallback extends Invocable, Callback - { - } - static Callback combine(Callback cb1, Callback cb2) { if (cb1 == null || cb1 == cb2) @@ -350,7 +365,7 @@ public interface Callback extends Invocable if (cb2 == null) return cb1; - return new InvocableCallback() + return new Callback() { @Override public void succeeded()