Merge remote-tracking branch 'origin/jetty-9.4.x' into jetty-10.0.x

Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
Greg Wilkins 2019-12-19 12:33:28 +11:00
commit d971716e6d
18 changed files with 1641 additions and 825 deletions

View File

@ -41,7 +41,6 @@ import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.server.HttpOutput;
import org.eclipse.jetty.util.Callback;
@ -243,8 +242,8 @@ public class AsyncIOTest extends AbstractTest
// The write is too large and will stall.
output.write(ByteBuffer.wrap(new byte[2 * clientWindow]));
// We cannot call complete() now before checking for isReady().
// This will abort the response and the client will receive a reset.
// We can now call complete() now before checking for isReady().
// This will asynchronously complete when the write is finished.
asyncContext.complete();
}
@ -275,7 +274,7 @@ public class AsyncIOTest extends AbstractTest
session.newStream(frame, promise, new Stream.Listener.Adapter()
{
@Override
public void onReset(Stream stream, ResetFrame frame)
public void onClosed(Stream stream)
{
latch.countDown();
}

View File

@ -521,8 +521,16 @@ public abstract class WriteFlusher
public void onClose()
{
switch (_state.get().getType())
{
case IDLE:
case FAILED:
return;
default:
onFail(new ClosedChannelException());
}
}
boolean isFailed()
{

View File

@ -146,7 +146,11 @@ public class AsyncContextState implements AsyncContext
@Override
public void run()
{
state().getAsyncContextEvent().getContext().getContextHandler().handle(channel.getRequest(), task);
ContextHandler.Context context = state().getAsyncContextEvent().getContext();
if (context == null)
task.run();
else
context.getContextHandler().handle(channel.getRequest(), task);
}
});
}

View File

@ -221,7 +221,7 @@ public class Dispatcher implements RequestDispatcher
_contextHandler.handle(_pathInContext, baseRequest, (HttpServletRequest)request, (HttpServletResponse)response);
if (!baseRequest.getHttpChannelState().isAsync())
commitResponse(response, baseRequest);
baseRequest.getResponse().softClose();
}
}
finally
@ -243,57 +243,6 @@ public class Dispatcher implements RequestDispatcher
return String.format("Dispatcher@0x%x{%s,%s}", hashCode(), _named, _uri);
}
@SuppressWarnings("Duplicates")
private void commitResponse(ServletResponse response, Request baseRequest) throws IOException, ServletException
{
if (baseRequest.getResponse().isWriting())
{
try
{
// Try closing Writer first (based on knowledge in Response obj)
response.getWriter().close();
}
catch (IllegalStateException ex1)
{
try
{
// Try closing OutputStream as alternate route
// This path is possible due to badly behaving Response wrappers
response.getOutputStream().close();
}
catch (IllegalStateException ex2)
{
ServletException servletException = new ServletException("Unable to commit the response", ex2);
servletException.addSuppressed(ex1);
throw servletException;
}
}
}
else
{
try
{
// Try closing OutputStream first (based on knowledge in Response obj)
response.getOutputStream().close();
}
catch (IllegalStateException ex1)
{
try
{
// Try closing Writer as alternate route
// This path is possible due to badly behaving Response wrappers
response.getWriter().close();
}
catch (IllegalStateException ex2)
{
ServletException servletException = new ServletException("Unable to commit the response", ex2);
servletException.addSuppressed(ex1);
throw servletException;
}
}
}
}
private class ForwardAttributes implements Attributes
{
final Attributes _attr;

View File

@ -47,16 +47,11 @@ public class EncodingHttpWriter extends HttpWriter
public void write(char[] s, int offset, int length) throws IOException
{
HttpOutput out = _out;
if (length == 0 && out.isAllContentWritten())
{
out.close();
return;
}
while (length > 0)
{
_bytes.reset();
int chars = length > MAX_OUTPUT_CHARS ? MAX_OUTPUT_CHARS : length;
int chars = Math.min(length, MAX_OUTPUT_CHARS);
_converter.write(s, offset, chars);
_converter.flush();

View File

@ -500,7 +500,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
// TODO that is done.
// Set a close callback on the HttpOutput to make it an async callback
_response.closeOutput(Callback.from(_state::completed));
_response.completeOutput(Callback.from(_state::completed));
break;
}
@ -1245,7 +1245,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
@Override
public void succeeded()
{
_response.getHttpOutput().closed();
_response.getHttpOutput().completed();
super.failed(x);
}

View File

@ -890,6 +890,9 @@ public class HttpChannelState
if (LOG.isDebugEnabled())
LOG.debug("sendError {}", toStringLocked());
if (_outputState != OutputState.OPEN)
throw new IllegalStateException(_outputState.toString());
switch (_state)
{
case HANDLING:
@ -903,7 +906,7 @@ public class HttpChannelState
throw new IllegalStateException("Response is " + _outputState);
response.setStatus(code);
response.closedBySendError();
response.softClose();
request.setAttribute(ErrorHandler.ERROR_CONTEXT, request.getErrorContext());
request.setAttribute(ERROR_REQUEST_URI, request.getRequestURI());
@ -971,7 +974,7 @@ public class HttpChannelState
}
// release any aggregate buffer from a closing flush
_channel.getResponse().getHttpOutput().closed();
_channel.getResponse().getHttpOutput().completed();
if (event != null)
{

View File

@ -429,12 +429,16 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
_parser.close();
}
else if (_parser.inContentState() && _generator.isPersistent())
{
// Try to progress without filling.
parseRequestBuffer();
if (_parser.inContentState())
{
// If we are async, then we have problems to complete neatly
if (_input.isAsync())
{
if (LOG.isDebugEnabled())
LOG.debug("{}unconsumed input {}", _parser.isChunking() ? "Possible " : "", this);
LOG.debug("{}unconsumed input while async {}", _parser.isChunking() ? "Possible " : "", this);
_channel.abort(new IOException("unconsumed input"));
}
else
@ -446,6 +450,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
_channel.abort(new IOException("unconsumed input"));
}
}
}
// Reset the channel, parsers and generator
_channel.recycle();

View File

@ -22,13 +22,14 @@ import java.io.IOException;
import java.io.Writer;
import org.eclipse.jetty.util.ByteArrayOutputStream2;
import org.eclipse.jetty.util.Callback;
/**
*
*/
public abstract class HttpWriter extends Writer
{
public static final int MAX_OUTPUT_CHARS = 512;
public static final int MAX_OUTPUT_CHARS = 512; // TODO should this be configurable? super size is 1024
final HttpOutput _out;
final ByteArrayOutputStream2 _bytes;
@ -38,7 +39,7 @@ public abstract class HttpWriter extends Writer
{
_out = out;
_chars = new char[MAX_OUTPUT_CHARS];
_bytes = new ByteArrayOutputStream2(MAX_OUTPUT_CHARS);
_bytes = new ByteArrayOutputStream2(MAX_OUTPUT_CHARS); // TODO should this be pooled - or do we just recycle the writer?
}
@Override
@ -47,6 +48,11 @@ public abstract class HttpWriter extends Writer
_out.close();
}
public void complete(Callback callback)
{
_out.complete(callback);
}
@Override
public void flush() throws IOException
{

View File

@ -35,11 +35,6 @@ public class Iso88591HttpWriter extends HttpWriter
public void write(char[] s, int offset, int length) throws IOException
{
HttpOutput out = _out;
if (length == 0 && out.isAllContentWritten())
{
close();
return;
}
if (length == 1)
{
@ -51,7 +46,7 @@ public class Iso88591HttpWriter extends HttpWriter
while (length > 0)
{
_bytes.reset();
int chars = length > MAX_OUTPUT_CHARS ? MAX_OUTPUT_CHARS : length;
int chars = Math.min(length, MAX_OUTPUT_CHARS);
byte[] buffer = _bytes.getBuf();
int bytes = _bytes.getCount();

View File

@ -146,10 +146,10 @@ public class Response implements HttpServletResponse
_out.reopen();
}
public void closedBySendError()
public void softClose()
{
setErrorSent(true);
_out.closedBySendError();
_out.softClose();
}
/**
@ -497,7 +497,7 @@ public class Response implements HttpServletResponse
resetBuffer();
setHeader(HttpHeader.LOCATION, location);
setStatus(code);
closeOutput();
completeOutput();
}
@Override
@ -789,7 +789,7 @@ public class Response implements HttpServletResponse
{
try
{
closeOutput();
completeOutput();
}
catch (IOException e)
{
@ -827,17 +827,20 @@ public class Response implements HttpServletResponse
return (_contentLength < 0 || written >= _contentLength);
}
public void closeOutput() throws IOException
public void completeOutput() throws IOException
{
if (_outputType == OutputType.WRITER)
_writer.close();
if (!_out.isClosed())
else
_out.close();
}
public void closeOutput(Callback callback)
public void completeOutput(Callback callback)
{
_out.close((_outputType == OutputType.WRITER) ? _writer : _out, callback);
if (_outputType == OutputType.WRITER)
_writer.complete(callback);
else
_out.complete(callback);
}
public long getLongContentLength()

View File

@ -27,6 +27,7 @@ import javax.servlet.ServletResponse;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -148,7 +149,7 @@ public class ResponseWriter extends PrintWriter
out.flush();
}
}
catch (IOException ex)
catch (Throwable ex)
{
setError(ex);
}
@ -171,6 +172,15 @@ public class ResponseWriter extends PrintWriter
}
}
public void complete(Callback callback)
{
synchronized (lock)
{
_isClosed = true;
}
_httpWriter.complete(callback);
}
@Override
public void write(int c)
{

View File

@ -42,16 +42,11 @@ public class Utf8HttpWriter extends HttpWriter
public void write(char[] s, int offset, int length) throws IOException
{
HttpOutput out = _out;
if (length == 0 && out.isAllContentWritten())
{
close();
return;
}
while (length > 0)
{
_bytes.reset();
int chars = length > MAX_OUTPUT_CHARS ? MAX_OUTPUT_CHARS : length;
int chars = Math.min(length, MAX_OUTPUT_CHARS);
byte[] buffer = _bytes.getBuf();
int bytes = _bytes.getCount();

View File

@ -18,7 +18,9 @@
package org.eclipse.jetty.server;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
@ -27,7 +29,9 @@ import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -48,9 +52,12 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SocketChannelEndPoint;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.Scheduler;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@ -65,14 +72,19 @@ import static org.hamcrest.Matchers.is;
*/
public class AsyncCompletionTest extends HttpServerTestFixture
{
private static final Exchanger<DelayedCallback> X = new Exchanger<>();
private static final AtomicBoolean COMPLETE = new AtomicBoolean();
private static final int POLL = 10; // milliseconds
private static final int WAIT = 10; // seconds
private static final String SMALL = "Now is the time for all good men to come to the aid of the party. ";
private static final String LARGE = SMALL + SMALL + SMALL + SMALL + SMALL;
private static final int BUFFER_SIZE = SMALL.length() * 3 / 2;
private static final BlockingQueue<PendingCallback> __queue = new BlockingArrayQueue<>();
private static final AtomicBoolean __transportComplete = new AtomicBoolean();
private static class DelayedCallback extends Callback.Nested
private static class PendingCallback extends Callback.Nested
{
private CompletableFuture<Void> _delay = new CompletableFuture<>();
private CompletableFuture<Void> _pending = new CompletableFuture<>();
public DelayedCallback(Callback callback)
public PendingCallback(Callback callback)
{
super(callback);
}
@ -80,20 +92,20 @@ public class AsyncCompletionTest extends HttpServerTestFixture
@Override
public void succeeded()
{
_delay.complete(null);
_pending.complete(null);
}
@Override
public void failed(Throwable x)
{
_delay.completeExceptionally(x);
_pending.completeExceptionally(x);
}
public void proceed()
{
try
{
_delay.get(10, TimeUnit.SECONDS);
_pending.get(WAIT, TimeUnit.SECONDS);
getCallback().succeeded();
}
catch (Throwable th)
@ -107,13 +119,15 @@ public class AsyncCompletionTest extends HttpServerTestFixture
@BeforeEach
public void init() throws Exception
{
COMPLETE.set(false);
__transportComplete.set(false);
startServer(new ServerConnector(_server, new HttpConnectionFactory()
{
@Override
public Connection newConnection(Connector connector, EndPoint endPoint)
{
getHttpConfiguration().setOutputBufferSize(BUFFER_SIZE);
getHttpConfiguration().setOutputAggregationSize(BUFFER_SIZE);
return configure(new ExtendedHttpConnection(getHttpConfiguration(), connector, endPoint), connector, endPoint);
}
})
@ -136,16 +150,9 @@ public class AsyncCompletionTest extends HttpServerTestFixture
@Override
public void write(Callback callback, ByteBuffer... buffers) throws IllegalStateException
{
DelayedCallback delay = new DelayedCallback(callback);
PendingCallback delay = new PendingCallback(callback);
super.write(delay, buffers);
try
{
X.exchange(delay);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
__queue.offer(delay);
}
}
@ -159,24 +166,44 @@ public class AsyncCompletionTest extends HttpServerTestFixture
@Override
public void onCompleted()
{
COMPLETE.compareAndSet(false, true);
__transportComplete.compareAndSet(false, true);
super.onCompleted();
}
}
// Tests from here use these parameters
public static Stream<Arguments> tests()
enum WriteStyle
{ARRAY, BUFFER, BYTE, BYTE_THEN_ARRAY, PRINT}
;
public static Stream<Arguments> asyncIOWriteTests()
{
List<Object[]> tests = new ArrayList<>();
tests.add(new Object[]{new HelloWorldHandler(), 200, "Hello world"});
tests.add(new Object[]{new SendErrorHandler(499, "Test async sendError"), 499, "Test async sendError"});
tests.add(new Object[]{new AsyncReadyCompleteHandler(), 200, AsyncReadyCompleteHandler.data});
for (WriteStyle w : WriteStyle.values())
{
for (boolean contentLength : new Boolean[]{true, false})
{
for (boolean isReady : new Boolean[]{true, false})
{
for (boolean flush : new Boolean[]{true, false})
{
for (boolean close : new Boolean[]{true, false})
{
for (String data : new String[]{SMALL, LARGE})
{
tests.add(new Object[]{new AsyncIOWriteHandler(w, contentLength, isReady, flush, close, data)});
}
}
}
}
}
}
return tests.stream().map(Arguments::of);
}
@ParameterizedTest
@MethodSource("tests")
public void testAsyncCompletion(Handler handler, int status, String message) throws Exception
@MethodSource("asyncIOWriteTests")
public void testAsyncIOWrite(AsyncIOWriteHandler handler) throws Exception
{
configureServer(handler);
@ -184,79 +211,242 @@ public class AsyncCompletionTest extends HttpServerTestFixture
try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort()))
{
OutputStream os = client.getOutputStream();
InputStream in = client.getInputStream();
// write the request
os.write("GET / HTTP/1.0\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1));
os.flush();
// The write should happen but the callback is delayed
HttpTester.Response response = HttpTester.parseResponse(client.getInputStream());
assertThat(response, Matchers.notNullValue());
assertThat(response.getStatus(), is(status));
String content = response.getContent();
assertThat(content, containsString(message));
// wait for OWP to execute (proves we do not block in write APIs)
boolean completeCalled = handler.waitForOWPExit();
// Check that a thread is held busy in write
assertThat(_threadPool.getBusyThreads(), Matchers.greaterThan(base));
// Getting the Delayed callback will free the thread
DelayedCallback delay = X.exchange(null, 10, TimeUnit.SECONDS);
// wait for threads to return to base level
long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
while (true)
{
// wait for threads to return to base level (proves we are really async)
long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(WAIT);
while (_threadPool.getBusyThreads() != base)
{
if (System.nanoTime() > end)
throw new TimeoutException();
Thread.sleep(10);
Thread.sleep(POLL);
}
if (completeCalled)
break;
// We are now asynchronously waiting!
assertThat(COMPLETE.get(), is(false));
assertThat(__transportComplete.get(), is(false));
// proceed with the completion
// If we are not complete, we must be waiting for one or more writes to complete
while (true)
{
PendingCallback delay = __queue.poll(POLL, TimeUnit.MILLISECONDS);
if (delay != null)
{
delay.proceed();
continue;
}
// No delay callback found, have we finished OWP again?
Boolean c = handler.pollForOWPExit();
while (!COMPLETE.get())
if (c == null)
// No we haven't, so look for another delay callback
continue;
// We have a OWP result, so let's handle it.
completeCalled = c;
break;
}
}
// Wait for full completion
long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(WAIT);
while (!__transportComplete.get())
{
if (System.nanoTime() > end)
throw new TimeoutException();
Thread.sleep(10);
// proceed with any delayCBs needed for completion
PendingCallback delay = __queue.poll(POLL, TimeUnit.MILLISECONDS);
if (delay != null)
delay.proceed();
}
// Check we got a response!
HttpTester.Response response = HttpTester.parseResponse(in);
assertThat(response, Matchers.notNullValue());
assertThat(response.getStatus(), is(200));
String content = response.getContent();
assertThat(content, containsString(handler.getExpectedMessage()));
}
}
private static class AsyncReadyCompleteHandler extends AbstractHandler
private static class AsyncIOWriteHandler extends AbstractHandler
{
static String data = "Now is the time for all good men to come to the aid of the party";
final WriteStyle _write;
final boolean _contentLength;
final boolean _isReady;
final boolean _flush;
final boolean _close;
final String _data;
final Exchanger<Boolean> _ready = new Exchanger<>();
int _toWrite;
boolean _flushed;
boolean _closed;
AsyncIOWriteHandler(WriteStyle write, boolean contentLength, boolean isReady, boolean flush, boolean close, String data)
{
_write = write;
_contentLength = contentLength;
_isReady = isReady;
_flush = flush;
_close = close;
_data = data;
_toWrite = data.length();
}
public String getExpectedMessage()
{
return SMALL;
}
boolean waitForOWPExit()
{
try
{
return _ready.exchange(null);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
}
Boolean pollForOWPExit()
{
try
{
return _ready.exchange(null, POLL, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
catch (TimeoutException e)
{
return null;
}
}
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
AsyncContext context = request.startAsync();
ServletOutputStream out = response.getOutputStream();
response.setContentType("text/plain");
byte[] bytes = _data.getBytes(StandardCharsets.ISO_8859_1);
if (_contentLength)
response.setContentLength(bytes.length);
out.setWriteListener(new WriteListener()
{
byte[] bytes = data.getBytes(StandardCharsets.ISO_8859_1);
@Override
public void onWritePossible() throws IOException
{
while (out.isReady())
try
{
if (bytes != null)
if (out.isReady())
{
response.setContentType("text/plain");
response.setContentLength(bytes.length);
out.write(bytes);
bytes = null;
}
else
if (_toWrite > 0)
{
context.complete();
switch (_write)
{
case ARRAY:
_toWrite = 0;
out.write(bytes, 0, bytes.length);
break;
case BUFFER:
_toWrite = 0;
((HttpOutput)out).write(BufferUtil.toBuffer(bytes));
break;
case BYTE:
for (int i = bytes.length - _toWrite; i < bytes.length; i++)
{
_toWrite--;
out.write(bytes[i]);
boolean ready = out.isReady();
if (!ready)
{
_ready.exchange(Boolean.FALSE);
return;
}
}
break;
case BYTE_THEN_ARRAY:
_toWrite = 0;
out.write(bytes[0]); // This should always aggregate
assertThat(out.isReady(), is(true));
out.write(bytes, 1, bytes.length - 1);
break;
case PRINT:
_toWrite = 0;
out.print(_data);
break;
}
}
if (_flush && !_flushed)
{
boolean ready = out.isReady();
if (!ready)
{
_ready.exchange(Boolean.FALSE);
return;
}
_flushed = true;
out.flush();
}
if (_close && !_closed)
{
if (_isReady)
{
boolean ready = out.isReady();
if (!ready)
{
_ready.exchange(Boolean.FALSE);
return;
}
}
_closed = true;
out.close();
}
if (_isReady)
{
boolean ready = out.isReady();
if (!ready)
{
_ready.exchange(Boolean.FALSE);
return;
}
}
context.complete();
_ready.exchange(Boolean.TRUE);
}
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
finally
{
}
}
@Override
@ -266,5 +456,312 @@ public class AsyncCompletionTest extends HttpServerTestFixture
}
});
}
@Override
public String toString()
{
return String.format("AWCH{w=%s,cl=%b,ir=%b,f=%b,c=%b,d=%d}", _write, _contentLength, _isReady, _flush, _close, _data.length());
}
}
public static Stream<Arguments> blockingWriteTests()
{
List<Object[]> tests = new ArrayList<>();
for (WriteStyle w : WriteStyle.values())
{
for (boolean contentLength : new Boolean[]{true, false})
{
for (boolean flush : new Boolean[]{true, false})
{
for (boolean close : new Boolean[]{true, false})
{
for (String data : new String[]{SMALL, LARGE})
{
tests.add(new Object[]{new BlockingWriteHandler(w, contentLength, flush, close, data)});
}
}
}
}
}
return tests.stream().map(Arguments::of);
}
@ParameterizedTest
@MethodSource("blockingWriteTests")
public void testBlockingWrite(BlockingWriteHandler handler) throws Exception
{
configureServer(handler);
try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort()))
{
OutputStream os = client.getOutputStream();
InputStream in = client.getInputStream();
// write the request
os.write("GET / HTTP/1.0\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1));
os.flush();
handler.wait4handle();
// Wait for full completion
long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(WAIT);
while (!__transportComplete.get())
{
if (System.nanoTime() > end)
throw new TimeoutException();
// proceed with any delayCBs needed for completion
try
{
PendingCallback delay = __queue.poll(POLL, TimeUnit.MILLISECONDS);
if (delay != null)
delay.proceed();
}
catch (Exception e)
{
// ignored
}
}
// Check we got a response!
HttpTester.Response response = HttpTester.parseResponse(in);
assertThat(response, Matchers.notNullValue());
assertThat(response.getStatus(), is(200));
String content = response.getContent();
assertThat(content, containsString(handler.getExpectedMessage()));
}
}
private static class BlockingWriteHandler extends AbstractHandler
{
final WriteStyle _write;
final boolean _contentLength;
final boolean _flush;
final boolean _close;
final String _data;
final CountDownLatch _wait = new CountDownLatch(1);
BlockingWriteHandler(WriteStyle write, boolean contentLength, boolean flush, boolean close, String data)
{
_write = write;
_contentLength = contentLength;
_flush = flush;
_close = close;
_data = data;
}
public String getExpectedMessage()
{
return SMALL;
}
public void wait4handle()
{
try
{
Assertions.assertTrue(_wait.await(WAIT, TimeUnit.SECONDS));
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
}
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
AsyncContext context = request.startAsync();
ServletOutputStream out = response.getOutputStream();
context.start(() ->
{
try
{
_wait.countDown();
response.setContentType("text/plain");
byte[] bytes = _data.getBytes(StandardCharsets.ISO_8859_1);
if (_contentLength)
response.setContentLength(bytes.length);
switch (_write)
{
case ARRAY:
out.write(bytes, 0, bytes.length);
break;
case BUFFER:
((HttpOutput)out).write(BufferUtil.toBuffer(bytes));
break;
case BYTE:
for (byte b : bytes)
{
out.write(b);
}
break;
case BYTE_THEN_ARRAY:
out.write(bytes[0]); // This should always aggregate
out.write(bytes, 1, bytes.length - 1);
break;
case PRINT:
out.print(_data);
break;
}
if (_flush)
out.flush();
if (_close)
out.close();
context.complete();
}
catch (Exception e)
{
throw new RuntimeException(e);
}
});
}
@Override
public String toString()
{
return String.format("BWCH{w=%s,cl=%b,f=%b,c=%b,d=%d}", _write, _contentLength, _flush, _close, _data.length());
}
}
public static Stream<Arguments> sendContentTests()
{
List<Object[]> tests = new ArrayList<>();
for (ContentStyle style : ContentStyle.values())
{
for (String data : new String[]{SMALL, LARGE})
{
tests.add(new Object[]{new SendContentHandler(style, data)});
}
}
return tests.stream().map(Arguments::of);
}
@ParameterizedTest
@MethodSource("sendContentTests")
public void testSendContent(SendContentHandler handler) throws Exception
{
configureServer(handler);
int base = _threadPool.getBusyThreads();
try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort()))
{
OutputStream os = client.getOutputStream();
InputStream in = client.getInputStream();
// write the request
os.write("GET / HTTP/1.0\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1));
os.flush();
handler.wait4handle();
long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(WAIT);
while (_threadPool.getBusyThreads() != base)
{
if (System.nanoTime() > end)
throw new TimeoutException();
Thread.sleep(POLL);
}
// Wait for full completion
end = System.nanoTime() + TimeUnit.SECONDS.toNanos(WAIT);
while (!__transportComplete.get())
{
if (System.nanoTime() > end)
throw new TimeoutException();
// proceed with any delayCBs needed for completion
try
{
PendingCallback delay = __queue.poll(POLL, TimeUnit.MILLISECONDS);
if (delay != null)
delay.proceed();
}
catch (Exception e)
{
// ignored
}
}
// Check we got a response!
HttpTester.Response response = HttpTester.parseResponse(in);
assertThat(response, Matchers.notNullValue());
assertThat(response.getStatus(), is(200));
String content = response.getContent();
assertThat(content, containsString(handler.getExpectedMessage()));
}
}
enum ContentStyle
{BUFFER, STREAM} // TODO more types needed here
private static class SendContentHandler extends AbstractHandler
{
final ContentStyle _style;
final String _data;
final CountDownLatch _wait = new CountDownLatch(1);
SendContentHandler(ContentStyle style, String data)
{
_style = style;
_data = data;
}
public String getExpectedMessage()
{
return SMALL;
}
public void wait4handle()
{
try
{
Assertions.assertTrue(_wait.await(WAIT, TimeUnit.SECONDS));
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
}
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
AsyncContext context = request.startAsync();
HttpOutput out = (HttpOutput)response.getOutputStream();
response.setContentType("text/plain");
byte[] bytes = _data.getBytes(StandardCharsets.ISO_8859_1);
switch (_style)
{
case BUFFER:
out.sendContent(BufferUtil.toBuffer(bytes), Callback.from(context::complete));
break;
case STREAM:
out.sendContent(new ByteArrayInputStream(bytes), Callback.from(context::complete));
break;
}
_wait.countDown();
}
@Override
public String toString()
{
return String.format("SCCH{w=%s,d=%d}", _style, _data.length());
}
}
}

View File

@ -33,6 +33,7 @@ import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.Exchanger;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
@ -1020,13 +1021,22 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
@Test
public void testPipeline() throws Exception
{
configureServer(new HelloWorldHandler());
AtomicInteger served = new AtomicInteger();
configureServer(new HelloWorldHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
served.incrementAndGet();
super.handle(target, baseRequest, request, response);
}
});
//for (int pipeline=1;pipeline<32;pipeline++)
for (int pipeline = 1; pipeline < 32; pipeline++)
int pipeline = 64;
{
try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort()))
{
served.set(0);
client.setSoTimeout(5000);
OutputStream os = client.getOutputStream();
@ -1065,6 +1075,7 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
count++;
line = in.readLine();
}
assertEquals(pipeline, served.get());
assertEquals(pipeline, count);
}
}

View File

@ -344,6 +344,58 @@ public interface Callback extends Invocable
}
}
interface InvocableCallback extends Invocable, Callback
{
}
static Callback combine(Callback cb1, Callback cb2)
{
if (cb1 == null || cb1 == cb2)
return cb2;
if (cb2 == null)
return cb1;
return new InvocableCallback()
{
@Override
public void succeeded()
{
try
{
cb1.succeeded();
}
finally
{
cb2.succeeded();
}
}
@Override
public void failed(Throwable x)
{
try
{
cb1.failed(x);
}
catch (Throwable t)
{
if (x != t)
x.addSuppressed(t);
}
finally
{
cb2.failed(x);
}
}
@Override
public InvocationType getInvocationType()
{
return Invocable.combine(Invocable.getInvocationType(cb1), Invocable.getInvocationType(cb2));
}
};
}
/**
* <p>A CompletableFuture that is also a Callback.</p>
*/

View File

@ -74,6 +74,20 @@ public interface Invocable
}
}
static InvocationType combine(InvocationType it1, InvocationType it2)
{
if (it1 != null && it2 != null)
{
if (it1 == it2)
return it1;
if (it1 == InvocationType.EITHER)
return it2;
if (it2 == InvocationType.EITHER)
return it1;
}
return InvocationType.BLOCKING;
}
/**
* Get the invocation type of an Object.
*