Fixes #6646 - SmallThreadPoolLoadTest on windows flaky.

Fixed occurrences of Callbacks that did not override getInvocationType() to properly declare whether they block or not.

Added test case for blocking writes for both client and server.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-08-24 01:10:56 +02:00
parent f9bb80aa96
commit 9897c1b06e
10 changed files with 395 additions and 19 deletions

View File

@ -183,6 +183,12 @@ public class FastFileServer
x.printStackTrace();
async.complete();
}
@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
};
// send "medium" files from an input stream

View File

@ -135,6 +135,12 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
close();
promise.failed(x);
}
@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
}
private static class ConnectionListener implements Connection.Listener

View File

@ -41,6 +41,7 @@ import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Invocable;
public class HTTP2Flusher extends IteratingCallback implements Dumpable
{
@ -53,6 +54,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
private final Collection<Entry> processedEntries = new ArrayList<>();
private final HTTP2Session session;
private final ByteBufferPool.Lease lease;
private InvocationType invocationType = InvocationType.NON_BLOCKING;
private Throwable terminated;
private Entry stalledEntry;
@ -62,6 +64,12 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
this.lease = new ByteBufferPool.Lease(session.getGenerator().getByteBufferPool());
}
@Override
public InvocationType getInvocationType()
{
return invocationType;
}
public void window(IStream stream, WindowUpdateFrame frame)
{
Throwable closed;
@ -217,7 +225,10 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
// We use ArrayList contains() + add() instead of HashSet add()
// because that is faster for collections of size up to 250 entries.
if (!processedEntries.contains(entry))
{
processedEntries.add(entry);
invocationType = Invocable.combine(invocationType, Invocable.getInvocationType(entry.getCallback()));
}
if (entry.getDataBytesRemaining() == 0)
pending.remove();
@ -314,6 +325,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
processedEntries.forEach(Entry::succeeded);
processedEntries.clear();
invocationType = InvocationType.NON_BLOCKING;
if (stalledEntry != null)
{

View File

@ -72,6 +72,7 @@ import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.DumpableCollection;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Scheduler;
@ -1900,7 +1901,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private void sendGoAwayAndTerminate(GoAwayFrame frame, GoAwayFrame eventFrame)
{
sendGoAway(frame, Callback.from(() -> terminate(eventFrame)));
sendGoAway(frame, Callback.from(Callback.NOOP, () -> terminate(eventFrame)));
}
private void sendGoAway(GoAwayFrame frame, Callback callback)
@ -2077,7 +2078,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
stream.setListener(listener);
stream.process(new PrefaceFrame(), Callback.NOOP);
Callback streamCallback = Callback.from(() -> promise.succeeded(stream), x ->
Callback streamCallback = Callback.from(Invocable.InvocationType.NON_BLOCKING, () -> promise.succeeded(stream), x ->
{
HTTP2Session.this.onStreamDestroyed(streamId);
promise.failed(x);

View File

@ -600,6 +600,15 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
callback.failed(x);
}
@Override
public InvocationType getInvocationType()
{
synchronized (this)
{
return sendCallback != null ? sendCallback.getInvocationType() : Callback.super.getInvocationType();
}
}
private Callback endWrite()
{
synchronized (this)

View File

@ -0,0 +1,305 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client.http;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
import org.eclipse.jetty.io.AbstractEndPoint;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class BlockedWritesWithSmallThreadPoolTest
{
private Server server;
private ServerConnector connector;
private QueuedThreadPool serverThreads;
private HTTP2Client client;
private void start(Handler handler) throws Exception
{
// Threads: 1 acceptor, 1 selector, 1 reserved, 1 application.
serverThreads = newSmallThreadPool("server", 4);
server = new Server(serverThreads);
HTTP2CServerConnectionFactory http2 = new HTTP2CServerConnectionFactory(new HttpConfiguration());
connector = new ServerConnector(server, 1, 1, http2);
server.addConnector(connector);
server.setHandler(handler);
server.start();
}
private void start(RawHTTP2ServerConnectionFactory factory) throws Exception
{
// Threads: 1 acceptor, 1 selector, 1 reserved, 1 application.
serverThreads = newSmallThreadPool("server", 4);
server = new Server(serverThreads);
connector = new ServerConnector(server, 1, 1, factory);
server.addConnector(connector);
server.start();
}
private QueuedThreadPool newSmallThreadPool(String name, int maxThreads)
{
QueuedThreadPool pool = new QueuedThreadPool(maxThreads, maxThreads);
pool.setName(name);
pool.setReservedThreads(1);
pool.setDetailedDump(true);
return pool;
}
@AfterEach
public void dispose()
{
LifeCycle.stop(client);
LifeCycle.stop(server);
}
@Test
public void testServerThreadsBlockedInWrites() throws Exception
{
int contentLength = 16 * 1024 * 1024;
AtomicReference<AbstractEndPoint> serverEndPointRef = new AtomicReference<>();
start(new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
serverEndPointRef.compareAndSet(null, (AbstractEndPoint)jettyRequest.getHttpChannel().getEndPoint());
// Write a large content to cause TCP congestion.
response.getOutputStream().write(new byte[contentLength]);
}
});
client = new HTTP2Client();
// Set large flow control windows so the server hits TCP congestion.
int window = 2 * contentLength;
client.setInitialSessionRecvWindow(window);
client.setInitialStreamRecvWindow(window);
client.start();
FuturePromise<Session> promise = new FuturePromise<>();
client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener.Adapter(), promise);
Session session = promise.get(5, TimeUnit.SECONDS);
CountDownLatch clientBlockLatch = new CountDownLatch(1);
CountDownLatch clientDataLatch = new CountDownLatch(1);
// Send a request to TCP congest the server.
HttpURI uri = new HttpURI("http://localhost:" + connector.getLocalPort() + "/congest");
MetaData.Request request = new MetaData.Request("GET", uri, HttpVersion.HTTP_2, new HttpFields());
session.newStream(new HeadersFrame(request, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
// Block here to stop reading from the network
// to cause the server to TCP congest.
awaitUntil(0, () -> clientBlockLatch.await(5, TimeUnit.SECONDS));
callback.succeeded();
if (frame.isEndStream())
clientDataLatch.countDown();
}
});
awaitUntil(5000, () ->
{
AbstractEndPoint serverEndPoint = serverEndPointRef.get();
return serverEndPoint != null && serverEndPoint.getWriteFlusher().isPending();
});
// Wait for NIO on the server to be OP_WRITE interested.
Thread.sleep(1000);
// Make sure there is a reserved thread.
if (serverThreads.getAvailableReservedThreads() != 1)
{
assertFalse(serverThreads.tryExecute(() -> {}));
awaitUntil(5000, () -> serverThreads.getAvailableReservedThreads() == 1);
}
// Use the reserved thread for a blocking operation, simulating another blocking write.
CountDownLatch serverBlockLatch = new CountDownLatch(1);
assertTrue(serverThreads.tryExecute(() -> awaitUntil(0, () -> serverBlockLatch.await(15, TimeUnit.SECONDS))));
assertEquals(0, serverThreads.getReadyThreads());
// Unblock the client to read from the network, which should unblock the server write().
clientBlockLatch.countDown();
assertTrue(clientDataLatch.await(10, TimeUnit.SECONDS), server.dump());
serverBlockLatch.countDown();
}
@Test
public void testClientThreadsBlockedInWrite() throws Exception
{
int contentLength = 16 * 1024 * 1024;
CountDownLatch serverBlockLatch = new CountDownLatch(1);
RawHTTP2ServerConnectionFactory http2 = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
return new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
// Block here to stop reading from the network
// to cause the client to TCP congest.
awaitUntil(0, () -> serverBlockLatch.await(5, TimeUnit.SECONDS));
callback.succeeded();
if (frame.isEndStream())
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
}
}
};
}
});
int window = 2 * contentLength;
http2.setInitialSessionRecvWindow(window);
http2.setInitialStreamRecvWindow(window);
start(http2);
client = new HTTP2Client();
// Threads: 1 selector, 1 reserved, 1 application.
QueuedThreadPool clientThreads = newSmallThreadPool("client", 3);
client.setExecutor(clientThreads);
client.setSelectors(1);
client.start();
FuturePromise<Session> promise = new FuturePromise<>();
client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener.Adapter(), promise);
Session session = promise.get(5, TimeUnit.SECONDS);
// Send a request to TCP congest the client.
HttpURI uri = new HttpURI("http://localhost:" + connector.getLocalPort() + "/congest");
MetaData.Request request = new MetaData.Request("GET", uri, HttpVersion.HTTP_2, new HttpFields());
FuturePromise<Stream> streamPromise = new FuturePromise<>();
CountDownLatch latch = new CountDownLatch(1);
session.newStream(new HeadersFrame(request, null, false), streamPromise, new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
MetaData.Response response = (MetaData.Response)frame.getMetaData();
assertEquals(HttpStatus.OK_200, response.getStatus());
latch.countDown();
}
});
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(contentLength), true), Callback.NOOP);
awaitUntil(5000, () ->
{
AbstractEndPoint clientEndPoint = (AbstractEndPoint)((HTTP2Session)session).getEndPoint();
return clientEndPoint.getWriteFlusher().isPending();
});
// Wait for NIO on the client to be OP_WRITE interested.
Thread.sleep(1000);
CountDownLatch clientBlockLatch = new CountDownLatch(1);
// Make sure the application threads are blocked.
IntStream.range(0, clientThreads.getIdleThreads())
.forEach(i -> clientThreads.execute(() -> awaitUntil(0, () -> clientBlockLatch.await(15, TimeUnit.SECONDS))));
// Make sure the reserved threads are blocked.
if (clientThreads.getAvailableReservedThreads() != 1)
{
assertFalse(clientThreads.tryExecute(() -> {}));
awaitUntil(5000, () -> clientThreads.getAvailableReservedThreads() == 1);
}
// Use the reserved thread for a blocking operation, simulating another blocking write.
assertTrue(clientThreads.tryExecute(() -> awaitUntil(0, () -> clientBlockLatch.await(15, TimeUnit.SECONDS))));
awaitUntil(5000, () -> clientThreads.getReadyThreads() == 0);
// Unblock the server to read from the network, which should unblock the client.
serverBlockLatch.countDown();
assertTrue(latch.await(10, TimeUnit.SECONDS), client.dump());
clientBlockLatch.countDown();
}
private void awaitUntil(long millis, Callable<Boolean> test)
{
try
{
if (millis == 0)
{
if (test.call())
return;
}
else
{
long begin = System.nanoTime();
while (System.nanoTime() - begin < TimeUnit.MILLISECONDS.toNanos(millis))
{
if (test.call())
return;
Thread.sleep(10);
}
}
fail("Await elapsed: " + millis + "ms");
}
catch (RuntimeException | Error x)
{
throw x;
}
catch (Exception x)
{
throw new RuntimeException(x);
}
}
}

View File

@ -63,6 +63,8 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
import static org.eclipse.jetty.util.thread.Invocable.InvocationType.NON_BLOCKING;
/**
* HttpChannel represents a single endpoint for HTTP semantic processing.
* The HttpChannel is both an HttpParser.RequestHandler, where it passively receives events from
@ -152,7 +154,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
* {@link TransientListeners} as an {@link AbstractConnector}
* provided listener</p>
* <p>Transient listeners are removed after every request cycle</p>
* @param listener
* @param listener the listener to add
* @return true if the listener was added.
*/
@Deprecated
@ -535,7 +537,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
}
// Set a close callback on the HttpOutput to make it an async callback
_response.completeOutput(Callback.from(() -> _state.completed(null), _state::completed));
_response.completeOutput(Callback.from(NON_BLOCKING, () -> _state.completed(null), _state::completed));
break;
}

View File

@ -741,7 +741,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
}
if (content == null)
{
new AsyncFlush(false).iterate();
}
else
{
try
@ -1579,7 +1581,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{
final boolean _last;
ChannelWriteCB(boolean last)
private ChannelWriteCB(boolean last)
{
_last = last;
}
@ -1605,14 +1607,20 @@ public class HttpOutput extends ServletOutputStream implements Runnable
private abstract class NestedChannelWriteCB extends ChannelWriteCB
{
final Callback _callback;
private final Callback _callback;
NestedChannelWriteCB(Callback callback, boolean last)
private NestedChannelWriteCB(Callback callback, boolean last)
{
super(last);
_callback = callback;
}
@Override
public InvocationType getInvocationType()
{
return _callback.getInvocationType();
}
@Override
protected void onCompleteSuccess()
{
@ -1647,9 +1655,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
private class AsyncFlush extends ChannelWriteCB
{
volatile boolean _flushed;
private volatile boolean _flushed;
AsyncFlush(boolean last)
private AsyncFlush(boolean last)
{
super(last);
}
@ -1682,7 +1690,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
private final int _len;
private boolean _completed;
AsyncWrite(byte[] b, int off, int len, boolean last)
private AsyncWrite(byte[] b, int off, int len, boolean last)
{
super(last);
_buffer = ByteBuffer.wrap(b, off, len);
@ -1691,7 +1699,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_slice = _len < getBufferSize() ? null : _buffer.duplicate();
}
AsyncWrite(ByteBuffer buffer, boolean last)
private AsyncWrite(ByteBuffer buffer, boolean last)
{
super(last);
_buffer = buffer;
@ -1779,7 +1787,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
private boolean _eof;
private boolean _closed;
InputStreamWritingCB(InputStream in, Callback callback)
private InputStreamWritingCB(InputStream in, Callback callback)
{
super(callback, true);
_in = in;
@ -1854,7 +1862,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
private boolean _eof;
private boolean _closed;
ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback)
private ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback)
{
super(callback, true);
_in = in;
@ -1935,5 +1943,11 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{
onWriteComplete(true, x);
}
@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
}
}

View File

@ -722,6 +722,12 @@ public class ResourceService
content.release();
}
@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
@Override
public String toString()
{

View File

@ -113,13 +113,26 @@ public interface Callback extends Invocable
}
/**
* Create a callback from the passed success and failure
* Creates a callback from the given success and failure lambdas.
*
* @param success Called when the callback succeeds
* @param failure Called when the callback fails
* @return a new Callback
*/
static Callback from(Runnable success, Consumer<Throwable> failure)
{
return from(InvocationType.BLOCKING, success, failure);
}
/**
* Creates a callback with the given InvocationType from the given success and failure lambdas.
*
* @param invocationType the Callback invocation type
* @param success Called when the callback succeeds
* @param failure Called when the callback fails
* @return a new Callback
*/
static Callback from(InvocationType invocationType, Runnable success, Consumer<Throwable> failure)
{
return new Callback()
{
@ -134,6 +147,12 @@ public interface Callback extends Invocable
{
failure.accept(x);
}
@Override
public InvocationType getInvocationType()
{
return invocationType;
}
};
}
@ -291,10 +310,6 @@ public interface Callback extends Invocable
}
}
interface InvocableCallback extends Invocable, Callback
{
}
static Callback combine(Callback cb1, Callback cb2)
{
if (cb1 == null || cb1 == cb2)
@ -302,7 +317,7 @@ public interface Callback extends Invocable
if (cb2 == null)
return cb1;
return new InvocableCallback()
return new Callback()
{
@Override
public void succeeded()