diff --git a/demos/embedded/src/main/java/org/eclipse/jetty/demos/FastFileServer.java b/demos/embedded/src/main/java/org/eclipse/jetty/demos/FastFileServer.java index 2bd217d118a..8cd69fa09ba 100644 --- a/demos/embedded/src/main/java/org/eclipse/jetty/demos/FastFileServer.java +++ b/demos/embedded/src/main/java/org/eclipse/jetty/demos/FastFileServer.java @@ -174,6 +174,12 @@ public class FastFileServer x.printStackTrace(); async.complete(); } + + @Override + public InvocationType getInvocationType() + { + return InvocationType.NON_BLOCKING; + } }; // send "medium" files from an input stream diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java index 2a6be2b23be..7d198e7f2d1 100644 --- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java +++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java @@ -135,6 +135,12 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory close(); promise.failed(x); } + + @Override + public InvocationType getInvocationType() + { + return InvocationType.NON_BLOCKING; + } } private static class ConnectionListener implements Connection.Listener diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java index aedaa2e0bef..62dd5caca42 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java @@ -35,6 +35,7 @@ import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.thread.AutoLock; +import org.eclipse.jetty.util.thread.Invocable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +51,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable private final Collection processedEntries = new ArrayList<>(); private final HTTP2Session session; private final ByteBufferPool.Lease lease; + private InvocationType invocationType = InvocationType.NON_BLOCKING; private Throwable terminated; private Entry stalledEntry; @@ -59,6 +61,12 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable this.lease = new ByteBufferPool.Lease(session.getGenerator().getByteBufferPool()); } + @Override + public InvocationType getInvocationType() + { + return invocationType; + } + public void window(IStream stream, WindowUpdateFrame frame) { Throwable closed; @@ -214,7 +222,10 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable // We use ArrayList contains() + add() instead of HashSet add() // because that is faster for collections of size up to 250 entries. if (!processedEntries.contains(entry)) + { processedEntries.add(entry); + invocationType = Invocable.combine(invocationType, Invocable.getInvocationType(entry.getCallback())); + } if (entry.getDataBytesRemaining() == 0) pending.remove(); @@ -311,6 +322,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable processedEntries.forEach(Entry::succeeded); processedEntries.clear(); + invocationType = InvocationType.NON_BLOCKING; if (stalledEntry != null) { diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 802718af54b..3f05639f6a4 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -72,6 +72,7 @@ import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.DumpableCollection; import org.eclipse.jetty.util.thread.AutoLock; +import org.eclipse.jetty.util.thread.Invocable; import org.eclipse.jetty.util.thread.Scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1991,7 +1992,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio private void sendGoAwayAndTerminate(GoAwayFrame frame, GoAwayFrame eventFrame) { - sendGoAway(frame, Callback.from(() -> terminate(eventFrame))); + sendGoAway(frame, Callback.from(Callback.NOOP, () -> terminate(eventFrame))); } private void sendGoAway(GoAwayFrame frame, Callback callback) @@ -2197,7 +2198,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio stream.setListener(listener); stream.process(new PrefaceFrame(), Callback.NOOP); - Callback streamCallback = Callback.from(() -> promise.succeeded(stream), x -> + Callback streamCallback = Callback.from(Invocable.InvocationType.NON_BLOCKING, () -> promise.succeeded(stream), x -> { HTTP2Session.this.onStreamDestroyed(streamId); promise.failed(x); diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index 11f1c1056e9..a7783f57f7b 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -742,6 +742,15 @@ public class HTTP2Stream implements IStream, Callback, Dumpable, CyclicTimeouts. callback.failed(x); } + @Override + public InvocationType getInvocationType() + { + synchronized (this) + { + return sendCallback != null ? sendCallback.getInvocationType() : Callback.super.getInvocationType(); + } + } + private Callback endWrite() { try (AutoLock l = lock.lock()) diff --git a/jetty-http2/http2-http-client-transport/pom.xml b/jetty-http2/http2-http-client-transport/pom.xml index 0048ecb9d03..2aafe38d95b 100644 --- a/jetty-http2/http2-http-client-transport/pom.xml +++ b/jetty-http2/http2-http-client-transport/pom.xml @@ -63,6 +63,11 @@ ${project.version} test + + org.awaitility + awaitility + test + diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/BlockedWritesWithSmallThreadPoolTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/BlockedWritesWithSmallThreadPoolTest.java new file mode 100644 index 00000000000..fc9e20d8e6c --- /dev/null +++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/BlockedWritesWithSmallThreadPoolTest.java @@ -0,0 +1,281 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.http2.client.http; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.IntStream; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.HttpURI; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.HTTP2Session; +import org.eclipse.jetty.http2.api.Session; +import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.api.server.ServerSessionListener; +import org.eclipse.jetty.http2.client.HTTP2Client; +import org.eclipse.jetty.http2.frames.DataFrame; +import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory; +import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory; +import org.eclipse.jetty.io.AbstractEndPoint; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.FuturePromise; +import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.component.LifeCycle; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class BlockedWritesWithSmallThreadPoolTest +{ + private Server server; + private ServerConnector connector; + private QueuedThreadPool serverThreads; + private HTTP2Client client; + + private void start(Handler handler) throws Exception + { + // Threads: 1 acceptor, 1 selector, 1 reserved, 1 application. + serverThreads = newSmallThreadPool("server", 4); + server = new Server(serverThreads); + HTTP2CServerConnectionFactory http2 = new HTTP2CServerConnectionFactory(new HttpConfiguration()); + connector = new ServerConnector(server, 1, 1, http2); + server.addConnector(connector); + server.setHandler(handler); + server.start(); + } + + private void start(RawHTTP2ServerConnectionFactory factory) throws Exception + { + // Threads: 1 acceptor, 1 selector, 1 reserved, 1 application. + serverThreads = newSmallThreadPool("server", 4); + server = new Server(serverThreads); + connector = new ServerConnector(server, 1, 1, factory); + server.addConnector(connector); + server.start(); + } + + private QueuedThreadPool newSmallThreadPool(String name, int maxThreads) + { + QueuedThreadPool pool = new QueuedThreadPool(maxThreads, maxThreads); + pool.setName(name); + pool.setReservedThreads(1); + pool.setDetailedDump(true); + return pool; + } + + @AfterEach + public void dispose() + { + LifeCycle.stop(client); + LifeCycle.stop(server); + } + + @Test + public void testServerThreadsBlockedInWrites() throws Exception + { + int contentLength = 16 * 1024 * 1024; + AtomicReference serverEndPointRef = new AtomicReference<>(); + start(new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + serverEndPointRef.compareAndSet(null, (AbstractEndPoint)jettyRequest.getHttpChannel().getEndPoint()); + // Write a large content to cause TCP congestion. + response.getOutputStream().write(new byte[contentLength]); + } + }); + + client = new HTTP2Client(); + // Set large flow control windows so the server hits TCP congestion. + int window = 2 * contentLength; + client.setInitialSessionRecvWindow(window); + client.setInitialStreamRecvWindow(window); + client.start(); + + FuturePromise promise = new FuturePromise<>(); + client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener.Adapter(), promise); + Session session = promise.get(5, SECONDS); + + CountDownLatch clientBlockLatch = new CountDownLatch(1); + CountDownLatch clientDataLatch = new CountDownLatch(1); + // Send a request to TCP congest the server. + HttpURI uri = HttpURI.build("http://localhost:" + connector.getLocalPort() + "/congest"); + MetaData.Request request = new MetaData.Request("GET", uri, HttpVersion.HTTP_2, HttpFields.EMPTY); + session.newStream(new HeadersFrame(request, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + try + { + // Block here to stop reading from the network + // to cause the server to TCP congest. + clientBlockLatch.await(5, SECONDS); + callback.succeeded(); + if (frame.isEndStream()) + clientDataLatch.countDown(); + } + catch (InterruptedException x) + { + callback.failed(x); + } + } + }); + + await().atMost(5, SECONDS).until(() -> + { + AbstractEndPoint serverEndPoint = serverEndPointRef.get(); + return serverEndPoint != null && serverEndPoint.getWriteFlusher().isPending(); + }); + // Wait for NIO on the server to be OP_WRITE interested. + Thread.sleep(1000); + + // Make sure there is a reserved thread. + if (serverThreads.getAvailableReservedThreads() != 1) + { + assertFalse(serverThreads.tryExecute(() -> {})); + await().atMost(5, SECONDS).until(() -> serverThreads.getAvailableReservedThreads() == 1); + } + // Use the reserved thread for a blocking operation, simulating another blocking write. + CountDownLatch serverBlockLatch = new CountDownLatch(1); + assertTrue(serverThreads.tryExecute(() -> await().atMost(20, SECONDS).until(() -> serverBlockLatch.await(15, SECONDS), b -> true))); + + assertEquals(0, serverThreads.getReadyThreads()); + + // Unblock the client to read from the network, which should unblock the server write(). + clientBlockLatch.countDown(); + + assertTrue(clientDataLatch.await(10, SECONDS), server.dump()); + serverBlockLatch.countDown(); + } + + @Test + public void testClientThreadsBlockedInWrite() throws Exception + { + int contentLength = 16 * 1024 * 1024; + CountDownLatch serverBlockLatch = new CountDownLatch(1); + RawHTTP2ServerConnectionFactory http2 = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + return new Stream.Listener.Adapter() + { + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + try + { + // Block here to stop reading from the network + // to cause the client to TCP congest. + serverBlockLatch.await(5, SECONDS); + callback.succeeded(); + if (frame.isEndStream()) + { + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + } + } + catch (InterruptedException x) + { + callback.failed(x); + } + } + }; + } + }); + int window = 2 * contentLength; + http2.setInitialSessionRecvWindow(window); + http2.setInitialStreamRecvWindow(window); + start(http2); + + client = new HTTP2Client(); + // Threads: 1 selector, 1 reserved, 1 application. + QueuedThreadPool clientThreads = newSmallThreadPool("client", 3); + client.setExecutor(clientThreads); + client.setSelectors(1); + client.start(); + + FuturePromise promise = new FuturePromise<>(); + client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener.Adapter(), promise); + Session session = promise.get(5, SECONDS); + + // Send a request to TCP congest the client. + HttpURI uri = HttpURI.build("http://localhost:" + connector.getLocalPort() + "/congest"); + MetaData.Request request = new MetaData.Request("GET", uri, HttpVersion.HTTP_2, HttpFields.EMPTY); + FuturePromise streamPromise = new FuturePromise<>(); + CountDownLatch latch = new CountDownLatch(1); + session.newStream(new HeadersFrame(request, null, false), streamPromise, new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + MetaData.Response response = (MetaData.Response)frame.getMetaData(); + assertEquals(HttpStatus.OK_200, response.getStatus()); + latch.countDown(); + } + }); + Stream stream = streamPromise.get(5, SECONDS); + stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(contentLength), true), Callback.NOOP); + + await().atMost(5, SECONDS).until(() -> + { + AbstractEndPoint clientEndPoint = (AbstractEndPoint)((HTTP2Session)session).getEndPoint(); + return clientEndPoint.getWriteFlusher().isPending(); + }); + // Wait for NIO on the client to be OP_WRITE interested. + Thread.sleep(1000); + + CountDownLatch clientBlockLatch = new CountDownLatch(1); + // Make sure the application thread is blocked. + clientThreads.execute(() -> await().until(() -> clientBlockLatch.await(15, SECONDS), b -> true)); + // Make sure the reserved thread is blocked. + if (clientThreads.getAvailableReservedThreads() != 1) + { + assertFalse(clientThreads.tryExecute(() -> {})); + await().atMost(5, SECONDS).until(() -> clientThreads.getAvailableReservedThreads() == 1); + } + // Use the reserved thread for a blocking operation, simulating another blocking write. + assertTrue(clientThreads.tryExecute(() -> await().until(() -> clientBlockLatch.await(15, SECONDS), b -> true))); + + await().atMost(5, SECONDS).until(() -> clientThreads.getReadyThreads() == 0); + + // Unblock the server to read from the network, which should unblock the client. + serverBlockLatch.countDown(); + + assertTrue(latch.await(10, SECONDS), client.dump()); + clientBlockLatch.countDown(); + } +} diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index 632929d8f46..f7d3ab5671c 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -56,6 +56,8 @@ import org.eclipse.jetty.util.thread.Scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.eclipse.jetty.util.thread.Invocable.InvocationType.NON_BLOCKING; + /** * HttpChannel represents a single endpoint for HTTP semantic processing. * The HttpChannel is both an HttpParser.RequestHandler, where it passively receives events from @@ -540,7 +542,7 @@ public abstract class HttpChannel implements Runnable, HttpOutput.Interceptor break; // Set a close callback on the HttpOutput to make it an async callback - _response.completeOutput(Callback.from(() -> _state.completed(null), _state::completed)); + _response.completeOutput(Callback.from(NON_BLOCKING, () -> _state.completed(null), _state::completed)); break; } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index f8df5ea27a2..42ed01b7b68 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -725,7 +725,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable } if (content == null) + { new AsyncFlush(false).iterate(); + } else { try @@ -1534,7 +1536,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable { final boolean _last; - ChannelWriteCB(boolean last) + private ChannelWriteCB(boolean last) { _last = last; } @@ -1560,14 +1562,20 @@ public class HttpOutput extends ServletOutputStream implements Runnable private abstract class NestedChannelWriteCB extends ChannelWriteCB { - final Callback _callback; + private final Callback _callback; - NestedChannelWriteCB(Callback callback, boolean last) + private NestedChannelWriteCB(Callback callback, boolean last) { super(last); _callback = callback; } + @Override + public InvocationType getInvocationType() + { + return _callback.getInvocationType(); + } + @Override protected void onCompleteSuccess() { @@ -1602,9 +1610,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable private class AsyncFlush extends ChannelWriteCB { - volatile boolean _flushed; + private volatile boolean _flushed; - AsyncFlush(boolean last) + private AsyncFlush(boolean last) { super(last); } @@ -1637,7 +1645,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable private final int _len; private boolean _completed; - AsyncWrite(byte[] b, int off, int len, boolean last) + private AsyncWrite(byte[] b, int off, int len, boolean last) { super(last); _buffer = ByteBuffer.wrap(b, off, len); @@ -1646,7 +1654,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable _slice = _len < getBufferSize() ? null : _buffer.duplicate(); } - AsyncWrite(ByteBuffer buffer, boolean last) + private AsyncWrite(ByteBuffer buffer, boolean last) { super(last); _buffer = buffer; @@ -1734,7 +1742,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable private boolean _eof; private boolean _closed; - InputStreamWritingCB(InputStream in, Callback callback) + private InputStreamWritingCB(InputStream in, Callback callback) { super(callback, true); _in = in; @@ -1810,7 +1818,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable private boolean _eof; private boolean _closed; - ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback) + private ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback) { super(callback, true); _in = in; @@ -1882,5 +1890,11 @@ public class HttpOutput extends ServletOutputStream implements Runnable { onWriteComplete(true, x); } + + @Override + public InvocationType getInvocationType() + { + return InvocationType.NON_BLOCKING; + } } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ResourceService.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ResourceService.java index 5172c42386b..afab62eed49 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ResourceService.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ResourceService.java @@ -717,6 +717,12 @@ public class ResourceService content.release(); } + @Override + public InvocationType getInvocationType() + { + return InvocationType.NON_BLOCKING; + } + @Override public String toString() { diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java index 634b1aa3ea9..d5c2fd60e3a 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java @@ -108,13 +108,26 @@ public interface Callback extends Invocable } /** - * Create a callback from the passed success and failure + * Creates a callback from the given success and failure lambdas. * * @param success Called when the callback succeeds * @param failure Called when the callback fails * @return a new Callback */ static Callback from(Runnable success, Consumer failure) + { + return from(InvocationType.BLOCKING, success, failure); + } + + /** + * Creates a callback with the given InvocationType from the given success and failure lambdas. + * + * @param invocationType the Callback invocation type + * @param success Called when the callback succeeds + * @param failure Called when the callback fails + * @return a new Callback + */ + static Callback from(InvocationType invocationType, Runnable success, Consumer failure) { return new Callback() { @@ -129,6 +142,12 @@ public interface Callback extends Invocable { failure.accept(x); } + + @Override + public InvocationType getInvocationType() + { + return invocationType; + } }; } @@ -339,10 +358,6 @@ public interface Callback extends Invocable } } - interface InvocableCallback extends Invocable, Callback - { - } - static Callback combine(Callback cb1, Callback cb2) { if (cb1 == null || cb1 == cb2) @@ -350,7 +365,7 @@ public interface Callback extends Invocable if (cb2 == null) return cb1; - return new InvocableCallback() + return new Callback() { @Override public void succeeded()