Fixes #6646 - Propagate callback invocation type to avoid deadlock in SmallThreadPoolLoadTest
Fixed occurrences of Callbacks that did not override getInvocationType() to properly declare whether they block or not.
Added test case for blocking writes for both client and server.
Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
(cherry picked from commit 9897c1b06e
)
This commit is contained in:
parent
3badb8629f
commit
2689cf75d1
|
@ -174,6 +174,12 @@ public class FastFileServer
|
|||
x.printStackTrace();
|
||||
async.complete();
|
||||
}
|
||||
|
||||
@Override
|
||||
public InvocationType getInvocationType()
|
||||
{
|
||||
return InvocationType.NON_BLOCKING;
|
||||
}
|
||||
};
|
||||
|
||||
// send "medium" files from an input stream
|
||||
|
|
|
@ -135,6 +135,12 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
|
|||
close();
|
||||
promise.failed(x);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InvocationType getInvocationType()
|
||||
{
|
||||
return InvocationType.NON_BLOCKING;
|
||||
}
|
||||
}
|
||||
|
||||
private static class ConnectionListener implements Connection.Listener
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.eclipse.jetty.util.Callback;
|
|||
import org.eclipse.jetty.util.IteratingCallback;
|
||||
import org.eclipse.jetty.util.component.Dumpable;
|
||||
import org.eclipse.jetty.util.thread.AutoLock;
|
||||
import org.eclipse.jetty.util.thread.Invocable;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -50,6 +51,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
|
|||
private final Collection<Entry> processedEntries = new ArrayList<>();
|
||||
private final HTTP2Session session;
|
||||
private final ByteBufferPool.Lease lease;
|
||||
private InvocationType invocationType = InvocationType.NON_BLOCKING;
|
||||
private Throwable terminated;
|
||||
private Entry stalledEntry;
|
||||
|
||||
|
@ -59,6 +61,12 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
|
|||
this.lease = new ByteBufferPool.Lease(session.getGenerator().getByteBufferPool());
|
||||
}
|
||||
|
||||
@Override
|
||||
public InvocationType getInvocationType()
|
||||
{
|
||||
return invocationType;
|
||||
}
|
||||
|
||||
public void window(IStream stream, WindowUpdateFrame frame)
|
||||
{
|
||||
Throwable closed;
|
||||
|
@ -214,7 +222,10 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
|
|||
// We use ArrayList contains() + add() instead of HashSet add()
|
||||
// because that is faster for collections of size up to 250 entries.
|
||||
if (!processedEntries.contains(entry))
|
||||
{
|
||||
processedEntries.add(entry);
|
||||
invocationType = Invocable.combine(invocationType, Invocable.getInvocationType(entry.getCallback()));
|
||||
}
|
||||
|
||||
if (entry.getDataBytesRemaining() == 0)
|
||||
pending.remove();
|
||||
|
@ -311,6 +322,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
|
|||
|
||||
processedEntries.forEach(Entry::succeeded);
|
||||
processedEntries.clear();
|
||||
invocationType = InvocationType.NON_BLOCKING;
|
||||
|
||||
if (stalledEntry != null)
|
||||
{
|
||||
|
|
|
@ -72,6 +72,7 @@ import org.eclipse.jetty.util.annotation.ManagedObject;
|
|||
import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
||||
import org.eclipse.jetty.util.component.DumpableCollection;
|
||||
import org.eclipse.jetty.util.thread.AutoLock;
|
||||
import org.eclipse.jetty.util.thread.Invocable;
|
||||
import org.eclipse.jetty.util.thread.Scheduler;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -1991,7 +1992,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
|
||||
private void sendGoAwayAndTerminate(GoAwayFrame frame, GoAwayFrame eventFrame)
|
||||
{
|
||||
sendGoAway(frame, Callback.from(() -> terminate(eventFrame)));
|
||||
sendGoAway(frame, Callback.from(Callback.NOOP, () -> terminate(eventFrame)));
|
||||
}
|
||||
|
||||
private void sendGoAway(GoAwayFrame frame, Callback callback)
|
||||
|
@ -2197,7 +2198,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
stream.setListener(listener);
|
||||
stream.process(new PrefaceFrame(), Callback.NOOP);
|
||||
|
||||
Callback streamCallback = Callback.from(() -> promise.succeeded(stream), x ->
|
||||
Callback streamCallback = Callback.from(Invocable.InvocationType.NON_BLOCKING, () -> promise.succeeded(stream), x ->
|
||||
{
|
||||
HTTP2Session.this.onStreamDestroyed(streamId);
|
||||
promise.failed(x);
|
||||
|
|
|
@ -742,6 +742,15 @@ public class HTTP2Stream implements IStream, Callback, Dumpable, CyclicTimeouts.
|
|||
callback.failed(x);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InvocationType getInvocationType()
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
return sendCallback != null ? sendCallback.getInvocationType() : Callback.super.getInvocationType();
|
||||
}
|
||||
}
|
||||
|
||||
private Callback endWrite()
|
||||
{
|
||||
try (AutoLock l = lock.lock())
|
||||
|
|
|
@ -63,6 +63,11 @@
|
|||
<version>${project.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.awaitility</groupId>
|
||||
<artifactId>awaitility</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
|
|
|
@ -0,0 +1,281 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
|
||||
//
|
||||
// This program and the accompanying materials are made available under the
|
||||
// terms of the Eclipse Public License v. 2.0 which is available at
|
||||
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
|
||||
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
|
||||
//
|
||||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.http2.client.http;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.IntStream;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.http.HttpURI;
|
||||
import org.eclipse.jetty.http.HttpVersion;
|
||||
import org.eclipse.jetty.http.MetaData;
|
||||
import org.eclipse.jetty.http2.HTTP2Session;
|
||||
import org.eclipse.jetty.http2.api.Session;
|
||||
import org.eclipse.jetty.http2.api.Stream;
|
||||
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
|
||||
import org.eclipse.jetty.http2.client.HTTP2Client;
|
||||
import org.eclipse.jetty.http2.frames.DataFrame;
|
||||
import org.eclipse.jetty.http2.frames.HeadersFrame;
|
||||
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
|
||||
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
|
||||
import org.eclipse.jetty.io.AbstractEndPoint;
|
||||
import org.eclipse.jetty.server.Handler;
|
||||
import org.eclipse.jetty.server.HttpConfiguration;
|
||||
import org.eclipse.jetty.server.Request;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.ServerConnector;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.FuturePromise;
|
||||
import org.eclipse.jetty.util.Promise;
|
||||
import org.eclipse.jetty.util.component.LifeCycle;
|
||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class BlockedWritesWithSmallThreadPoolTest
|
||||
{
|
||||
private Server server;
|
||||
private ServerConnector connector;
|
||||
private QueuedThreadPool serverThreads;
|
||||
private HTTP2Client client;
|
||||
|
||||
private void start(Handler handler) throws Exception
|
||||
{
|
||||
// Threads: 1 acceptor, 1 selector, 1 reserved, 1 application.
|
||||
serverThreads = newSmallThreadPool("server", 4);
|
||||
server = new Server(serverThreads);
|
||||
HTTP2CServerConnectionFactory http2 = new HTTP2CServerConnectionFactory(new HttpConfiguration());
|
||||
connector = new ServerConnector(server, 1, 1, http2);
|
||||
server.addConnector(connector);
|
||||
server.setHandler(handler);
|
||||
server.start();
|
||||
}
|
||||
|
||||
private void start(RawHTTP2ServerConnectionFactory factory) throws Exception
|
||||
{
|
||||
// Threads: 1 acceptor, 1 selector, 1 reserved, 1 application.
|
||||
serverThreads = newSmallThreadPool("server", 4);
|
||||
server = new Server(serverThreads);
|
||||
connector = new ServerConnector(server, 1, 1, factory);
|
||||
server.addConnector(connector);
|
||||
server.start();
|
||||
}
|
||||
|
||||
private QueuedThreadPool newSmallThreadPool(String name, int maxThreads)
|
||||
{
|
||||
QueuedThreadPool pool = new QueuedThreadPool(maxThreads, maxThreads);
|
||||
pool.setName(name);
|
||||
pool.setReservedThreads(1);
|
||||
pool.setDetailedDump(true);
|
||||
return pool;
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void dispose()
|
||||
{
|
||||
LifeCycle.stop(client);
|
||||
LifeCycle.stop(server);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testServerThreadsBlockedInWrites() throws Exception
|
||||
{
|
||||
int contentLength = 16 * 1024 * 1024;
|
||||
AtomicReference<AbstractEndPoint> serverEndPointRef = new AtomicReference<>();
|
||||
start(new EmptyServerHandler()
|
||||
{
|
||||
@Override
|
||||
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
|
||||
{
|
||||
serverEndPointRef.compareAndSet(null, (AbstractEndPoint)jettyRequest.getHttpChannel().getEndPoint());
|
||||
// Write a large content to cause TCP congestion.
|
||||
response.getOutputStream().write(new byte[contentLength]);
|
||||
}
|
||||
});
|
||||
|
||||
client = new HTTP2Client();
|
||||
// Set large flow control windows so the server hits TCP congestion.
|
||||
int window = 2 * contentLength;
|
||||
client.setInitialSessionRecvWindow(window);
|
||||
client.setInitialStreamRecvWindow(window);
|
||||
client.start();
|
||||
|
||||
FuturePromise<Session> promise = new FuturePromise<>();
|
||||
client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener.Adapter(), promise);
|
||||
Session session = promise.get(5, SECONDS);
|
||||
|
||||
CountDownLatch clientBlockLatch = new CountDownLatch(1);
|
||||
CountDownLatch clientDataLatch = new CountDownLatch(1);
|
||||
// Send a request to TCP congest the server.
|
||||
HttpURI uri = HttpURI.build("http://localhost:" + connector.getLocalPort() + "/congest");
|
||||
MetaData.Request request = new MetaData.Request("GET", uri, HttpVersion.HTTP_2, HttpFields.EMPTY);
|
||||
session.newStream(new HeadersFrame(request, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Block here to stop reading from the network
|
||||
// to cause the server to TCP congest.
|
||||
clientBlockLatch.await(5, SECONDS);
|
||||
callback.succeeded();
|
||||
if (frame.isEndStream())
|
||||
clientDataLatch.countDown();
|
||||
}
|
||||
catch (InterruptedException x)
|
||||
{
|
||||
callback.failed(x);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
await().atMost(5, SECONDS).until(() ->
|
||||
{
|
||||
AbstractEndPoint serverEndPoint = serverEndPointRef.get();
|
||||
return serverEndPoint != null && serverEndPoint.getWriteFlusher().isPending();
|
||||
});
|
||||
// Wait for NIO on the server to be OP_WRITE interested.
|
||||
Thread.sleep(1000);
|
||||
|
||||
// Make sure there is a reserved thread.
|
||||
if (serverThreads.getAvailableReservedThreads() != 1)
|
||||
{
|
||||
assertFalse(serverThreads.tryExecute(() -> {}));
|
||||
await().atMost(5, SECONDS).until(() -> serverThreads.getAvailableReservedThreads() == 1);
|
||||
}
|
||||
// Use the reserved thread for a blocking operation, simulating another blocking write.
|
||||
CountDownLatch serverBlockLatch = new CountDownLatch(1);
|
||||
assertTrue(serverThreads.tryExecute(() -> await().atMost(20, SECONDS).until(() -> serverBlockLatch.await(15, SECONDS), b -> true)));
|
||||
|
||||
assertEquals(0, serverThreads.getReadyThreads());
|
||||
|
||||
// Unblock the client to read from the network, which should unblock the server write().
|
||||
clientBlockLatch.countDown();
|
||||
|
||||
assertTrue(clientDataLatch.await(10, SECONDS), server.dump());
|
||||
serverBlockLatch.countDown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientThreadsBlockedInWrite() throws Exception
|
||||
{
|
||||
int contentLength = 16 * 1024 * 1024;
|
||||
CountDownLatch serverBlockLatch = new CountDownLatch(1);
|
||||
RawHTTP2ServerConnectionFactory http2 = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), new ServerSessionListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
return new Stream.Listener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Block here to stop reading from the network
|
||||
// to cause the client to TCP congest.
|
||||
serverBlockLatch.await(5, SECONDS);
|
||||
callback.succeeded();
|
||||
if (frame.isEndStream())
|
||||
{
|
||||
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY);
|
||||
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
|
||||
}
|
||||
}
|
||||
catch (InterruptedException x)
|
||||
{
|
||||
callback.failed(x);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
int window = 2 * contentLength;
|
||||
http2.setInitialSessionRecvWindow(window);
|
||||
http2.setInitialStreamRecvWindow(window);
|
||||
start(http2);
|
||||
|
||||
client = new HTTP2Client();
|
||||
// Threads: 1 selector, 1 reserved, 1 application.
|
||||
QueuedThreadPool clientThreads = newSmallThreadPool("client", 3);
|
||||
client.setExecutor(clientThreads);
|
||||
client.setSelectors(1);
|
||||
client.start();
|
||||
|
||||
FuturePromise<Session> promise = new FuturePromise<>();
|
||||
client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener.Adapter(), promise);
|
||||
Session session = promise.get(5, SECONDS);
|
||||
|
||||
// Send a request to TCP congest the client.
|
||||
HttpURI uri = HttpURI.build("http://localhost:" + connector.getLocalPort() + "/congest");
|
||||
MetaData.Request request = new MetaData.Request("GET", uri, HttpVersion.HTTP_2, HttpFields.EMPTY);
|
||||
FuturePromise<Stream> streamPromise = new FuturePromise<>();
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
session.newStream(new HeadersFrame(request, null, false), streamPromise, new Stream.Listener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onHeaders(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
MetaData.Response response = (MetaData.Response)frame.getMetaData();
|
||||
assertEquals(HttpStatus.OK_200, response.getStatus());
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
Stream stream = streamPromise.get(5, SECONDS);
|
||||
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(contentLength), true), Callback.NOOP);
|
||||
|
||||
await().atMost(5, SECONDS).until(() ->
|
||||
{
|
||||
AbstractEndPoint clientEndPoint = (AbstractEndPoint)((HTTP2Session)session).getEndPoint();
|
||||
return clientEndPoint.getWriteFlusher().isPending();
|
||||
});
|
||||
// Wait for NIO on the client to be OP_WRITE interested.
|
||||
Thread.sleep(1000);
|
||||
|
||||
CountDownLatch clientBlockLatch = new CountDownLatch(1);
|
||||
// Make sure the application thread is blocked.
|
||||
clientThreads.execute(() -> await().until(() -> clientBlockLatch.await(15, SECONDS), b -> true));
|
||||
// Make sure the reserved thread is blocked.
|
||||
if (clientThreads.getAvailableReservedThreads() != 1)
|
||||
{
|
||||
assertFalse(clientThreads.tryExecute(() -> {}));
|
||||
await().atMost(5, SECONDS).until(() -> clientThreads.getAvailableReservedThreads() == 1);
|
||||
}
|
||||
// Use the reserved thread for a blocking operation, simulating another blocking write.
|
||||
assertTrue(clientThreads.tryExecute(() -> await().until(() -> clientBlockLatch.await(15, SECONDS), b -> true)));
|
||||
|
||||
await().atMost(5, SECONDS).until(() -> clientThreads.getReadyThreads() == 0);
|
||||
|
||||
// Unblock the server to read from the network, which should unblock the client.
|
||||
serverBlockLatch.countDown();
|
||||
|
||||
assertTrue(latch.await(10, SECONDS), client.dump());
|
||||
clientBlockLatch.countDown();
|
||||
}
|
||||
}
|
|
@ -56,6 +56,8 @@ import org.eclipse.jetty.util.thread.Scheduler;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.eclipse.jetty.util.thread.Invocable.InvocationType.NON_BLOCKING;
|
||||
|
||||
/**
|
||||
* HttpChannel represents a single endpoint for HTTP semantic processing.
|
||||
* The HttpChannel is both an HttpParser.RequestHandler, where it passively receives events from
|
||||
|
@ -540,7 +542,7 @@ public abstract class HttpChannel implements Runnable, HttpOutput.Interceptor
|
|||
break;
|
||||
|
||||
// Set a close callback on the HttpOutput to make it an async callback
|
||||
_response.completeOutput(Callback.from(() -> _state.completed(null), _state::completed));
|
||||
_response.completeOutput(Callback.from(NON_BLOCKING, () -> _state.completed(null), _state::completed));
|
||||
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -725,7 +725,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
}
|
||||
|
||||
if (content == null)
|
||||
{
|
||||
new AsyncFlush(false).iterate();
|
||||
}
|
||||
else
|
||||
{
|
||||
try
|
||||
|
@ -1534,7 +1536,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
{
|
||||
final boolean _last;
|
||||
|
||||
ChannelWriteCB(boolean last)
|
||||
private ChannelWriteCB(boolean last)
|
||||
{
|
||||
_last = last;
|
||||
}
|
||||
|
@ -1560,14 +1562,20 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
|
||||
private abstract class NestedChannelWriteCB extends ChannelWriteCB
|
||||
{
|
||||
final Callback _callback;
|
||||
private final Callback _callback;
|
||||
|
||||
NestedChannelWriteCB(Callback callback, boolean last)
|
||||
private NestedChannelWriteCB(Callback callback, boolean last)
|
||||
{
|
||||
super(last);
|
||||
_callback = callback;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InvocationType getInvocationType()
|
||||
{
|
||||
return _callback.getInvocationType();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onCompleteSuccess()
|
||||
{
|
||||
|
@ -1602,9 +1610,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
|
||||
private class AsyncFlush extends ChannelWriteCB
|
||||
{
|
||||
volatile boolean _flushed;
|
||||
private volatile boolean _flushed;
|
||||
|
||||
AsyncFlush(boolean last)
|
||||
private AsyncFlush(boolean last)
|
||||
{
|
||||
super(last);
|
||||
}
|
||||
|
@ -1637,7 +1645,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
private final int _len;
|
||||
private boolean _completed;
|
||||
|
||||
AsyncWrite(byte[] b, int off, int len, boolean last)
|
||||
private AsyncWrite(byte[] b, int off, int len, boolean last)
|
||||
{
|
||||
super(last);
|
||||
_buffer = ByteBuffer.wrap(b, off, len);
|
||||
|
@ -1646,7 +1654,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
_slice = _len < getBufferSize() ? null : _buffer.duplicate();
|
||||
}
|
||||
|
||||
AsyncWrite(ByteBuffer buffer, boolean last)
|
||||
private AsyncWrite(ByteBuffer buffer, boolean last)
|
||||
{
|
||||
super(last);
|
||||
_buffer = buffer;
|
||||
|
@ -1734,7 +1742,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
private boolean _eof;
|
||||
private boolean _closed;
|
||||
|
||||
InputStreamWritingCB(InputStream in, Callback callback)
|
||||
private InputStreamWritingCB(InputStream in, Callback callback)
|
||||
{
|
||||
super(callback, true);
|
||||
_in = in;
|
||||
|
@ -1810,7 +1818,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
private boolean _eof;
|
||||
private boolean _closed;
|
||||
|
||||
ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback)
|
||||
private ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback)
|
||||
{
|
||||
super(callback, true);
|
||||
_in = in;
|
||||
|
@ -1882,5 +1890,11 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
{
|
||||
onWriteComplete(true, x);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InvocationType getInvocationType()
|
||||
{
|
||||
return InvocationType.NON_BLOCKING;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -717,6 +717,12 @@ public class ResourceService
|
|||
content.release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public InvocationType getInvocationType()
|
||||
{
|
||||
return InvocationType.NON_BLOCKING;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
|
|
@ -108,13 +108,26 @@ public interface Callback extends Invocable
|
|||
}
|
||||
|
||||
/**
|
||||
* Create a callback from the passed success and failure
|
||||
* Creates a callback from the given success and failure lambdas.
|
||||
*
|
||||
* @param success Called when the callback succeeds
|
||||
* @param failure Called when the callback fails
|
||||
* @return a new Callback
|
||||
*/
|
||||
static Callback from(Runnable success, Consumer<Throwable> failure)
|
||||
{
|
||||
return from(InvocationType.BLOCKING, success, failure);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a callback with the given InvocationType from the given success and failure lambdas.
|
||||
*
|
||||
* @param invocationType the Callback invocation type
|
||||
* @param success Called when the callback succeeds
|
||||
* @param failure Called when the callback fails
|
||||
* @return a new Callback
|
||||
*/
|
||||
static Callback from(InvocationType invocationType, Runnable success, Consumer<Throwable> failure)
|
||||
{
|
||||
return new Callback()
|
||||
{
|
||||
|
@ -129,6 +142,12 @@ public interface Callback extends Invocable
|
|||
{
|
||||
failure.accept(x);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InvocationType getInvocationType()
|
||||
{
|
||||
return invocationType;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -339,10 +358,6 @@ public interface Callback extends Invocable
|
|||
}
|
||||
}
|
||||
|
||||
interface InvocableCallback extends Invocable, Callback
|
||||
{
|
||||
}
|
||||
|
||||
static Callback combine(Callback cb1, Callback cb2)
|
||||
{
|
||||
if (cb1 == null || cb1 == cb2)
|
||||
|
@ -350,7 +365,7 @@ public interface Callback extends Invocable
|
|||
if (cb2 == null)
|
||||
return cb1;
|
||||
|
||||
return new InvocableCallback()
|
||||
return new Callback()
|
||||
{
|
||||
@Override
|
||||
public void succeeded()
|
||||
|
|
Loading…
Reference in New Issue