Jetty 9.4.x 4331 async close complete3 (#4409)

* Issue #4376 Async Content Complete

Added test harness to reproduce unready completing write.
Fixed test by not closing output prior to becoming READY

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Async Write Complete

Test harness to reproduce unready when closing/completing.

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Async Write Complete

test both PENDING and UNREADY

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Async Write Complete

test cleanups

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Async Close Complete

Cleanups of write

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* WIP

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

Work in progress

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

Added async close to HttpWriter and ResponseWriter
Always use async close, with blocker if necessary.

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

Working async close complete!

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

invert test as we can now call complete when not ready!

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

fixed transition to ERROR state

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

async close after onError

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

minor cleanups

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

Fix for proxy tests

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

Fix write loop to handle clear of p=0,l=0 rather than p=l

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

Removed old close on all content mechanism
Cleanups and some more TODOs

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

a reworking of HttpOutput to separate out API state.

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

Soft close for Dispatcher
release buffer in onWriteComplete

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

Set _onError in onWriteComplete
NOOP callback instead of null

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

failure closes HttpOutput

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

Moved closedCallback handling to onWriteComplete

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

Additional test of complete during blocking write.

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

reimplemented blocking close to sometimes be async

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

ascii "art"

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

Code cleanup.  Use a CLOSE state rather than non null closedCallback to be clearer that it is a state.
Renamed close(Callback) to complete(Callback)
Renamed and simplified closed() to completed()

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

Do not dispatch
Better ascii art
improved close impl to be similar to complete

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

More test cases

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

retain execute behaviour in 9.4. review in 10.

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Improved javadoc and ascii art

* Improved CLOSING

Switch to CLOSING state as soon as last write is done, even if several non last channelWrites will be done.   This allows a subsequent call to close to know that nothing needs to be written and can avoid some EOF exceptions. Now onWriteComplete acts only on the passed in last parameter.

Added test for sendContent

* WIP

Aggregate within lock
pipeline test debug

* Avoid creating ignored exception when Idle or Failed.

* Try a parse without fill to avoid unconsumed input debug

* fixed pipeline size

* release buffer before callback

* turn off debug

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

Better javadoc
refactored onWriteComplete logic to be simpler
fixed bug with flush of last written byte

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

Completely reworked test harness for better coverage.

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

Reworked order of ifs to match logic above in onWriteComplete

Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
Greg Wilkins 2019-12-19 12:17:11 +11:00 committed by GitHub
parent fcc18b0530
commit c5acf96506
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1632 additions and 816 deletions

View File

@ -41,7 +41,6 @@ import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.server.HttpOutput;
import org.eclipse.jetty.util.Callback;
@ -243,8 +242,8 @@ public class AsyncIOTest extends AbstractTest
// The write is too large and will stall.
output.write(ByteBuffer.wrap(new byte[2 * clientWindow]));
// We cannot call complete() now before checking for isReady().
// This will abort the response and the client will receive a reset.
// We can now call complete() now before checking for isReady().
// This will asynchronously complete when the write is finished.
asyncContext.complete();
}
@ -275,7 +274,7 @@ public class AsyncIOTest extends AbstractTest
session.newStream(frame, promise, new Stream.Listener.Adapter()
{
@Override
public void onReset(Stream stream, ResetFrame frame)
public void onClosed(Stream stream)
{
latch.countDown();
}

View File

@ -518,7 +518,15 @@ public abstract class WriteFlusher
public void onClose()
{
onFail(new ClosedChannelException());
switch (_state.get().getType())
{
case IDLE:
case FAILED:
return;
default:
onFail(new ClosedChannelException());
}
}
boolean isFailed()

View File

@ -146,7 +146,11 @@ public class AsyncContextState implements AsyncContext
@Override
public void run()
{
state().getAsyncContextEvent().getContext().getContextHandler().handle(channel.getRequest(), task);
ContextHandler.Context context = state().getAsyncContextEvent().getContext();
if (context == null)
task.run();
else
context.getContextHandler().handle(channel.getRequest(), task);
}
});
}

View File

@ -220,7 +220,7 @@ public class Dispatcher implements RequestDispatcher
_contextHandler.handle(_pathInContext, baseRequest, (HttpServletRequest)request, (HttpServletResponse)response);
if (!baseRequest.getHttpChannelState().isAsync())
commitResponse(response, baseRequest);
baseRequest.getResponse().softClose();
}
}
finally
@ -242,57 +242,6 @@ public class Dispatcher implements RequestDispatcher
return String.format("Dispatcher@0x%x{%s,%s}", hashCode(), _named, _uri);
}
@SuppressWarnings("Duplicates")
private void commitResponse(ServletResponse response, Request baseRequest) throws IOException, ServletException
{
if (baseRequest.getResponse().isWriting())
{
try
{
// Try closing Writer first (based on knowledge in Response obj)
response.getWriter().close();
}
catch (IllegalStateException ex)
{
try
{
// Try closing OutputStream as alternate route
// This path is possible due to badly behaving Response wrappers
response.getOutputStream().close();
}
catch (IllegalStateException ex2)
{
ServletException servletException = new ServletException("Unable to commit the response", ex2);
servletException.addSuppressed(ex);
throw servletException;
}
}
}
else
{
try
{
// Try closing OutputStream first (based on knowledge in Response obj)
response.getOutputStream().close();
}
catch (IllegalStateException ex)
{
try
{
// Try closing Writer as alternate route
// This path is possible due to badly behaving Response wrappers
response.getWriter().close();
}
catch (IllegalStateException ex2)
{
ServletException servletException = new ServletException("Unable to commit the response", ex2);
servletException.addSuppressed(ex);
throw servletException;
}
}
}
}
private class ForwardAttributes implements Attributes
{
final Attributes _attr;

View File

@ -47,16 +47,11 @@ public class EncodingHttpWriter extends HttpWriter
public void write(char[] s, int offset, int length) throws IOException
{
HttpOutput out = _out;
if (length == 0 && out.isAllContentWritten())
{
out.close();
return;
}
while (length > 0)
{
_bytes.reset();
int chars = length > MAX_OUTPUT_CHARS ? MAX_OUTPUT_CHARS : length;
int chars = Math.min(length, MAX_OUTPUT_CHARS);
_converter.write(s, offset, chars);
_converter.flush();

View File

@ -509,7 +509,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
// TODO that is done.
// Set a close callback on the HttpOutput to make it an async callback
_response.closeOutput(Callback.from(_state::completed));
_response.completeOutput(Callback.from(_state::completed));
break;
}
@ -1212,7 +1212,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
@Override
public void succeeded()
{
_response.getHttpOutput().closed();
_response.getHttpOutput().completed();
super.failed(x);
}

View File

@ -889,6 +889,9 @@ public class HttpChannelState
if (LOG.isDebugEnabled())
LOG.debug("sendError {}", toStringLocked());
if (_outputState != OutputState.OPEN)
throw new IllegalStateException(_outputState.toString());
switch (_state)
{
case HANDLING:
@ -902,7 +905,7 @@ public class HttpChannelState
throw new IllegalStateException("Response is " + _outputState);
response.setStatus(code);
response.closedBySendError();
response.softClose();
request.setAttribute(ErrorHandler.ERROR_CONTEXT, request.getErrorContext());
request.setAttribute(ERROR_REQUEST_URI, request.getRequestURI());
@ -970,7 +973,7 @@ public class HttpChannelState
}
// release any aggregate buffer from a closing flush
_channel.getResponse().getHttpOutput().closed();
_channel.getResponse().getHttpOutput().completed();
if (event != null)
{

View File

@ -407,20 +407,25 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
else if (_parser.inContentState() && _generator.isPersistent())
{
// If we are async, then we have problems to complete neatly
if (_input.isAsync())
// Try to progress without filling.
parseRequestBuffer();
if (_parser.inContentState())
{
if (LOG.isDebugEnabled())
LOG.debug("{}unconsumed input {}", _parser.isChunking() ? "Possible " : "", this);
_channel.abort(new IOException("unconsumed input"));
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("{}unconsumed input {}", _parser.isChunking() ? "Possible " : "", this);
// Complete reading the request
if (!_input.consumeAll())
// If we are async, then we have problems to complete neatly
if (_input.isAsync())
{
if (LOG.isDebugEnabled())
LOG.debug("{}unconsumed input while async {}", _parser.isChunking() ? "Possible " : "", this);
_channel.abort(new IOException("unconsumed input"));
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("{}unconsumed input {}", _parser.isChunking() ? "Possible " : "", this);
// Complete reading the request
if (!_input.consumeAll())
_channel.abort(new IOException("unconsumed input"));
}
}
}

View File

@ -22,13 +22,14 @@ import java.io.IOException;
import java.io.Writer;
import org.eclipse.jetty.util.ByteArrayOutputStream2;
import org.eclipse.jetty.util.Callback;
/**
*
*/
public abstract class HttpWriter extends Writer
{
public static final int MAX_OUTPUT_CHARS = 512;
public static final int MAX_OUTPUT_CHARS = 512; // TODO should this be configurable? super size is 1024
final HttpOutput _out;
final ByteArrayOutputStream2 _bytes;
@ -38,7 +39,7 @@ public abstract class HttpWriter extends Writer
{
_out = out;
_chars = new char[MAX_OUTPUT_CHARS];
_bytes = new ByteArrayOutputStream2(MAX_OUTPUT_CHARS);
_bytes = new ByteArrayOutputStream2(MAX_OUTPUT_CHARS); // TODO should this be pooled - or do we just recycle the writer?
}
@Override
@ -47,6 +48,11 @@ public abstract class HttpWriter extends Writer
_out.close();
}
public void complete(Callback callback)
{
_out.complete(callback);
}
@Override
public void flush() throws IOException
{

View File

@ -35,11 +35,6 @@ public class Iso88591HttpWriter extends HttpWriter
public void write(char[] s, int offset, int length) throws IOException
{
HttpOutput out = _out;
if (length == 0 && out.isAllContentWritten())
{
close();
return;
}
if (length == 1)
{
@ -51,7 +46,7 @@ public class Iso88591HttpWriter extends HttpWriter
while (length > 0)
{
_bytes.reset();
int chars = length > MAX_OUTPUT_CHARS ? MAX_OUTPUT_CHARS : length;
int chars = Math.min(length, MAX_OUTPUT_CHARS);
byte[] buffer = _bytes.getBuf();
int bytes = _bytes.getCount();

View File

@ -145,10 +145,10 @@ public class Response implements HttpServletResponse
_out.reopen();
}
public void closedBySendError()
public void softClose()
{
setErrorSent(true);
_out.closedBySendError();
_out.softClose();
}
/**
@ -496,7 +496,7 @@ public class Response implements HttpServletResponse
resetBuffer();
setHeader(HttpHeader.LOCATION, location);
setStatus(code);
closeOutput();
completeOutput();
}
@Override
@ -788,7 +788,7 @@ public class Response implements HttpServletResponse
{
try
{
closeOutput();
completeOutput();
}
catch (IOException e)
{
@ -826,17 +826,20 @@ public class Response implements HttpServletResponse
return (_contentLength < 0 || written >= _contentLength);
}
public void closeOutput() throws IOException
public void completeOutput() throws IOException
{
if (_outputType == OutputType.WRITER)
_writer.close();
if (!_out.isClosed())
else
_out.close();
}
public void closeOutput(Callback callback)
public void completeOutput(Callback callback)
{
_out.close((_outputType == OutputType.WRITER) ? _writer : _out, callback);
if (_outputType == OutputType.WRITER)
_writer.complete(callback);
else
_out.complete(callback);
}
public long getLongContentLength()

View File

@ -27,6 +27,7 @@ import javax.servlet.ServletResponse;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -148,7 +149,7 @@ public class ResponseWriter extends PrintWriter
out.flush();
}
}
catch (IOException ex)
catch (Throwable ex)
{
setError(ex);
}
@ -171,6 +172,15 @@ public class ResponseWriter extends PrintWriter
}
}
public void complete(Callback callback)
{
synchronized (lock)
{
_isClosed = true;
}
_httpWriter.complete(callback);
}
@Override
public void write(int c)
{

View File

@ -42,16 +42,11 @@ public class Utf8HttpWriter extends HttpWriter
public void write(char[] s, int offset, int length) throws IOException
{
HttpOutput out = _out;
if (length == 0 && out.isAllContentWritten())
{
close();
return;
}
while (length > 0)
{
_bytes.reset();
int chars = length > MAX_OUTPUT_CHARS ? MAX_OUTPUT_CHARS : length;
int chars = Math.min(length, MAX_OUTPUT_CHARS);
byte[] buffer = _bytes.getBuf();
int bytes = _bytes.getCount();

View File

@ -18,7 +18,9 @@
package org.eclipse.jetty.server;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
@ -27,7 +29,9 @@ import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -48,9 +52,12 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SocketChannelEndPoint;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.Scheduler;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@ -65,14 +72,19 @@ import static org.hamcrest.Matchers.is;
*/
public class AsyncCompletionTest extends HttpServerTestFixture
{
private static final Exchanger<DelayedCallback> X = new Exchanger<>();
private static final AtomicBoolean COMPLETE = new AtomicBoolean();
private static final int POLL = 10; // milliseconds
private static final int WAIT = 10; // seconds
private static final String SMALL = "Now is the time for all good men to come to the aid of the party. ";
private static final String LARGE = SMALL + SMALL + SMALL + SMALL + SMALL;
private static final int BUFFER_SIZE = SMALL.length() * 3 / 2;
private static final BlockingQueue<PendingCallback> __queue = new BlockingArrayQueue<>();
private static final AtomicBoolean __transportComplete = new AtomicBoolean();
private static class DelayedCallback extends Callback.Nested
private static class PendingCallback extends Callback.Nested
{
private CompletableFuture<Void> _delay = new CompletableFuture<>();
private CompletableFuture<Void> _pending = new CompletableFuture<>();
public DelayedCallback(Callback callback)
public PendingCallback(Callback callback)
{
super(callback);
}
@ -80,20 +92,20 @@ public class AsyncCompletionTest extends HttpServerTestFixture
@Override
public void succeeded()
{
_delay.complete(null);
_pending.complete(null);
}
@Override
public void failed(Throwable x)
{
_delay.completeExceptionally(x);
_pending.completeExceptionally(x);
}
public void proceed()
{
try
{
_delay.get(10, TimeUnit.SECONDS);
_pending.get(WAIT, TimeUnit.SECONDS);
getCallback().succeeded();
}
catch (Throwable th)
@ -107,13 +119,15 @@ public class AsyncCompletionTest extends HttpServerTestFixture
@BeforeEach
public void init() throws Exception
{
COMPLETE.set(false);
__transportComplete.set(false);
startServer(new ServerConnector(_server, new HttpConnectionFactory()
{
@Override
public Connection newConnection(Connector connector, EndPoint endPoint)
{
getHttpConfiguration().setOutputBufferSize(BUFFER_SIZE);
getHttpConfiguration().setOutputAggregationSize(BUFFER_SIZE);
return configure(new ExtendedHttpConnection(getHttpConfiguration(), connector, endPoint), connector, endPoint);
}
})
@ -136,16 +150,9 @@ public class AsyncCompletionTest extends HttpServerTestFixture
@Override
public void write(Callback callback, ByteBuffer... buffers) throws IllegalStateException
{
DelayedCallback delay = new DelayedCallback(callback);
PendingCallback delay = new PendingCallback(callback);
super.write(delay, buffers);
try
{
X.exchange(delay);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
__queue.offer(delay);
}
}
@ -159,24 +166,44 @@ public class AsyncCompletionTest extends HttpServerTestFixture
@Override
public void onCompleted()
{
COMPLETE.compareAndSet(false, true);
__transportComplete.compareAndSet(false, true);
super.onCompleted();
}
}
// Tests from here use these parameters
public static Stream<Arguments> tests()
enum WriteStyle
{ARRAY, BUFFER, BYTE, BYTE_THEN_ARRAY, PRINT}
;
public static Stream<Arguments> asyncIOWriteTests()
{
List<Object[]> tests = new ArrayList<>();
tests.add(new Object[]{new HelloWorldHandler(), 200, "Hello world"});
tests.add(new Object[]{new SendErrorHandler(499, "Test async sendError"), 499, "Test async sendError"});
tests.add(new Object[]{new AsyncReadyCompleteHandler(), 200, AsyncReadyCompleteHandler.data});
for (WriteStyle w : WriteStyle.values())
{
for (boolean contentLength : new Boolean[]{true, false})
{
for (boolean isReady : new Boolean[]{true, false})
{
for (boolean flush : new Boolean[]{true, false})
{
for (boolean close : new Boolean[]{true, false})
{
for (String data : new String[]{SMALL, LARGE})
{
tests.add(new Object[]{new AsyncIOWriteHandler(w, contentLength, isReady, flush, close, data)});
}
}
}
}
}
}
return tests.stream().map(Arguments::of);
}
@ParameterizedTest
@MethodSource("tests")
public void testAsyncCompletion(Handler handler, int status, String message) throws Exception
@MethodSource("asyncIOWriteTests")
public void testAsyncIOWrite(AsyncIOWriteHandler handler) throws Exception
{
configureServer(handler);
@ -184,79 +211,242 @@ public class AsyncCompletionTest extends HttpServerTestFixture
try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort()))
{
OutputStream os = client.getOutputStream();
InputStream in = client.getInputStream();
// write the request
os.write("GET / HTTP/1.0\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1));
os.flush();
// The write should happen but the callback is delayed
HttpTester.Response response = HttpTester.parseResponse(client.getInputStream());
// wait for OWP to execute (proves we do not block in write APIs)
boolean completeCalled = handler.waitForOWPExit();
while (true)
{
// wait for threads to return to base level (proves we are really async)
long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(WAIT);
while (_threadPool.getBusyThreads() != base)
{
if (System.nanoTime() > end)
throw new TimeoutException();
Thread.sleep(POLL);
}
if (completeCalled)
break;
// We are now asynchronously waiting!
assertThat(__transportComplete.get(), is(false));
// If we are not complete, we must be waiting for one or more writes to complete
while (true)
{
PendingCallback delay = __queue.poll(POLL, TimeUnit.MILLISECONDS);
if (delay != null)
{
delay.proceed();
continue;
}
// No delay callback found, have we finished OWP again?
Boolean c = handler.pollForOWPExit();
if (c == null)
// No we haven't, so look for another delay callback
continue;
// We have a OWP result, so let's handle it.
completeCalled = c;
break;
}
}
// Wait for full completion
long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(WAIT);
while (!__transportComplete.get())
{
if (System.nanoTime() > end)
throw new TimeoutException();
// proceed with any delayCBs needed for completion
PendingCallback delay = __queue.poll(POLL, TimeUnit.MILLISECONDS);
if (delay != null)
delay.proceed();
}
// Check we got a response!
HttpTester.Response response = HttpTester.parseResponse(in);
assertThat(response, Matchers.notNullValue());
assertThat(response.getStatus(), is(status));
assertThat(response.getStatus(), is(200));
String content = response.getContent();
assertThat(content, containsString(message));
// Check that a thread is held busy in write
assertThat(_threadPool.getBusyThreads(), Matchers.greaterThan(base));
// Getting the Delayed callback will free the thread
DelayedCallback delay = X.exchange(null, 10, TimeUnit.SECONDS);
// wait for threads to return to base level
long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
while (_threadPool.getBusyThreads() != base)
{
if (System.nanoTime() > end)
throw new TimeoutException();
Thread.sleep(10);
}
// We are now asynchronously waiting!
assertThat(COMPLETE.get(), is(false));
// proceed with the completion
delay.proceed();
while (!COMPLETE.get())
{
if (System.nanoTime() > end)
throw new TimeoutException();
Thread.sleep(10);
}
assertThat(content, containsString(handler.getExpectedMessage()));
}
}
private static class AsyncReadyCompleteHandler extends AbstractHandler
private static class AsyncIOWriteHandler extends AbstractHandler
{
static String data = "Now is the time for all good men to come to the aid of the party";
final WriteStyle _write;
final boolean _contentLength;
final boolean _isReady;
final boolean _flush;
final boolean _close;
final String _data;
final Exchanger<Boolean> _ready = new Exchanger<>();
int _toWrite;
boolean _flushed;
boolean _closed;
AsyncIOWriteHandler(WriteStyle write, boolean contentLength, boolean isReady, boolean flush, boolean close, String data)
{
_write = write;
_contentLength = contentLength;
_isReady = isReady;
_flush = flush;
_close = close;
_data = data;
_toWrite = data.length();
}
public String getExpectedMessage()
{
return SMALL;
}
boolean waitForOWPExit()
{
try
{
return _ready.exchange(null);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
}
Boolean pollForOWPExit()
{
try
{
return _ready.exchange(null, POLL, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
catch (TimeoutException e)
{
return null;
}
}
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
AsyncContext context = request.startAsync();
ServletOutputStream out = response.getOutputStream();
response.setContentType("text/plain");
byte[] bytes = _data.getBytes(StandardCharsets.ISO_8859_1);
if (_contentLength)
response.setContentLength(bytes.length);
out.setWriteListener(new WriteListener()
{
byte[] bytes = data.getBytes(StandardCharsets.ISO_8859_1);
@Override
public void onWritePossible() throws IOException
{
while (out.isReady())
try
{
if (bytes != null)
{
response.setContentType("text/plain");
response.setContentLength(bytes.length);
out.write(bytes);
bytes = null;
}
else
if (out.isReady())
{
if (_toWrite > 0)
{
switch (_write)
{
case ARRAY:
_toWrite = 0;
out.write(bytes, 0, bytes.length);
break;
case BUFFER:
_toWrite = 0;
((HttpOutput)out).write(BufferUtil.toBuffer(bytes));
break;
case BYTE:
for (int i = bytes.length - _toWrite; i < bytes.length; i++)
{
_toWrite--;
out.write(bytes[i]);
boolean ready = out.isReady();
if (!ready)
{
_ready.exchange(Boolean.FALSE);
return;
}
}
break;
case BYTE_THEN_ARRAY:
_toWrite = 0;
out.write(bytes[0]); // This should always aggregate
assertThat(out.isReady(), is(true));
out.write(bytes, 1, bytes.length - 1);
break;
case PRINT:
_toWrite = 0;
out.print(_data);
break;
}
}
if (_flush && !_flushed)
{
boolean ready = out.isReady();
if (!ready)
{
_ready.exchange(Boolean.FALSE);
return;
}
_flushed = true;
out.flush();
}
if (_close && !_closed)
{
if (_isReady)
{
boolean ready = out.isReady();
if (!ready)
{
_ready.exchange(Boolean.FALSE);
return;
}
}
_closed = true;
out.close();
}
if (_isReady)
{
boolean ready = out.isReady();
if (!ready)
{
_ready.exchange(Boolean.FALSE);
return;
}
}
context.complete();
return;
_ready.exchange(Boolean.TRUE);
}
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
finally
{
}
}
@Override
@ -266,5 +456,312 @@ public class AsyncCompletionTest extends HttpServerTestFixture
}
});
}
@Override
public String toString()
{
return String.format("AWCH{w=%s,cl=%b,ir=%b,f=%b,c=%b,d=%d}", _write, _contentLength, _isReady, _flush, _close, _data.length());
}
}
public static Stream<Arguments> blockingWriteTests()
{
List<Object[]> tests = new ArrayList<>();
for (WriteStyle w : WriteStyle.values())
{
for (boolean contentLength : new Boolean[]{true, false})
{
for (boolean flush : new Boolean[]{true, false})
{
for (boolean close : new Boolean[]{true, false})
{
for (String data : new String[]{SMALL, LARGE})
{
tests.add(new Object[]{new BlockingWriteHandler(w, contentLength, flush, close, data)});
}
}
}
}
}
return tests.stream().map(Arguments::of);
}
@ParameterizedTest
@MethodSource("blockingWriteTests")
public void testBlockingWrite(BlockingWriteHandler handler) throws Exception
{
configureServer(handler);
try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort()))
{
OutputStream os = client.getOutputStream();
InputStream in = client.getInputStream();
// write the request
os.write("GET / HTTP/1.0\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1));
os.flush();
handler.wait4handle();
// Wait for full completion
long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(WAIT);
while (!__transportComplete.get())
{
if (System.nanoTime() > end)
throw new TimeoutException();
// proceed with any delayCBs needed for completion
try
{
PendingCallback delay = __queue.poll(POLL, TimeUnit.MILLISECONDS);
if (delay != null)
delay.proceed();
}
catch (Exception e)
{
// ignored
}
}
// Check we got a response!
HttpTester.Response response = HttpTester.parseResponse(in);
assertThat(response, Matchers.notNullValue());
assertThat(response.getStatus(), is(200));
String content = response.getContent();
assertThat(content, containsString(handler.getExpectedMessage()));
}
}
private static class BlockingWriteHandler extends AbstractHandler
{
final WriteStyle _write;
final boolean _contentLength;
final boolean _flush;
final boolean _close;
final String _data;
final CountDownLatch _wait = new CountDownLatch(1);
BlockingWriteHandler(WriteStyle write, boolean contentLength, boolean flush, boolean close, String data)
{
_write = write;
_contentLength = contentLength;
_flush = flush;
_close = close;
_data = data;
}
public String getExpectedMessage()
{
return SMALL;
}
public void wait4handle()
{
try
{
Assertions.assertTrue(_wait.await(WAIT, TimeUnit.SECONDS));
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
}
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
AsyncContext context = request.startAsync();
ServletOutputStream out = response.getOutputStream();
context.start(() ->
{
try
{
_wait.countDown();
response.setContentType("text/plain");
byte[] bytes = _data.getBytes(StandardCharsets.ISO_8859_1);
if (_contentLength)
response.setContentLength(bytes.length);
switch (_write)
{
case ARRAY:
out.write(bytes, 0, bytes.length);
break;
case BUFFER:
((HttpOutput)out).write(BufferUtil.toBuffer(bytes));
break;
case BYTE:
for (byte b : bytes)
{
out.write(b);
}
break;
case BYTE_THEN_ARRAY:
out.write(bytes[0]); // This should always aggregate
out.write(bytes, 1, bytes.length - 1);
break;
case PRINT:
out.print(_data);
break;
}
if (_flush)
out.flush();
if (_close)
out.close();
context.complete();
}
catch (Exception e)
{
throw new RuntimeException(e);
}
});
}
@Override
public String toString()
{
return String.format("BWCH{w=%s,cl=%b,f=%b,c=%b,d=%d}", _write, _contentLength, _flush, _close, _data.length());
}
}
public static Stream<Arguments> sendContentTests()
{
List<Object[]> tests = new ArrayList<>();
for (ContentStyle style : ContentStyle.values())
{
for (String data : new String[]{SMALL, LARGE})
{
tests.add(new Object[]{new SendContentHandler(style, data)});
}
}
return tests.stream().map(Arguments::of);
}
@ParameterizedTest
@MethodSource("sendContentTests")
public void testSendContent(SendContentHandler handler) throws Exception
{
configureServer(handler);
int base = _threadPool.getBusyThreads();
try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort()))
{
OutputStream os = client.getOutputStream();
InputStream in = client.getInputStream();
// write the request
os.write("GET / HTTP/1.0\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1));
os.flush();
handler.wait4handle();
long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(WAIT);
while (_threadPool.getBusyThreads() != base)
{
if (System.nanoTime() > end)
throw new TimeoutException();
Thread.sleep(POLL);
}
// Wait for full completion
end = System.nanoTime() + TimeUnit.SECONDS.toNanos(WAIT);
while (!__transportComplete.get())
{
if (System.nanoTime() > end)
throw new TimeoutException();
// proceed with any delayCBs needed for completion
try
{
PendingCallback delay = __queue.poll(POLL, TimeUnit.MILLISECONDS);
if (delay != null)
delay.proceed();
}
catch (Exception e)
{
// ignored
}
}
// Check we got a response!
HttpTester.Response response = HttpTester.parseResponse(in);
assertThat(response, Matchers.notNullValue());
assertThat(response.getStatus(), is(200));
String content = response.getContent();
assertThat(content, containsString(handler.getExpectedMessage()));
}
}
enum ContentStyle
{BUFFER, STREAM} // TODO more types needed here
private static class SendContentHandler extends AbstractHandler
{
final ContentStyle _style;
final String _data;
final CountDownLatch _wait = new CountDownLatch(1);
SendContentHandler(ContentStyle style, String data)
{
_style = style;
_data = data;
}
public String getExpectedMessage()
{
return SMALL;
}
public void wait4handle()
{
try
{
Assertions.assertTrue(_wait.await(WAIT, TimeUnit.SECONDS));
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
}
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
AsyncContext context = request.startAsync();
HttpOutput out = (HttpOutput)response.getOutputStream();
response.setContentType("text/plain");
byte[] bytes = _data.getBytes(StandardCharsets.ISO_8859_1);
switch (_style)
{
case BUFFER:
out.sendContent(BufferUtil.toBuffer(bytes), Callback.from(context::complete));
break;
case STREAM:
out.sendContent(new ByteArrayInputStream(bytes), Callback.from(context::complete));
break;
}
_wait.countDown();
}
@Override
public String toString()
{
return String.format("SCCH{w=%s,d=%d}", _style, _data.length());
}
}
}

View File

@ -33,6 +33,7 @@ import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.Exchanger;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
@ -1020,13 +1021,22 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
@Test
public void testPipeline() throws Exception
{
configureServer(new HelloWorldHandler());
AtomicInteger served = new AtomicInteger();
configureServer(new HelloWorldHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
served.incrementAndGet();
super.handle(target, baseRequest, request, response);
}
});
//for (int pipeline=1;pipeline<32;pipeline++)
for (int pipeline = 1; pipeline < 32; pipeline++)
int pipeline = 64;
{
try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort()))
{
served.set(0);
client.setSoTimeout(5000);
OutputStream os = client.getOutputStream();
@ -1065,6 +1075,7 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
count++;
line = in.readLine();
}
assertEquals(pipeline, served.get());
assertEquals(pipeline, count);
}
}

View File

@ -291,6 +291,58 @@ public interface Callback extends Invocable
}
}
interface InvocableCallback extends Invocable, Callback
{
}
static Callback combine(Callback cb1, Callback cb2)
{
if (cb1 == null || cb1 == cb2)
return cb2;
if (cb2 == null)
return cb1;
return new InvocableCallback()
{
@Override
public void succeeded()
{
try
{
cb1.succeeded();
}
finally
{
cb2.succeeded();
}
}
@Override
public void failed(Throwable x)
{
try
{
cb1.failed(x);
}
catch (Throwable t)
{
if (x != t)
x.addSuppressed(t);
}
finally
{
cb2.failed(x);
}
}
@Override
public InvocationType getInvocationType()
{
return Invocable.combine(Invocable.getInvocationType(cb1), Invocable.getInvocationType(cb2));
}
};
}
/**
* <p>A CompletableFuture that is also a Callback.</p>
*/

View File

@ -74,6 +74,20 @@ public interface Invocable
}
}
static InvocationType combine(InvocationType it1, InvocationType it2)
{
if (it1 != null && it2 != null)
{
if (it1 == it2)
return it1;
if (it1 == InvocationType.EITHER)
return it2;
if (it2 == InvocationType.EITHER)
return it1;
}
return InvocationType.BLOCKING;
}
/**
* Get the invocation type of an Object.
*