Merged branch 'jetty-9.3.x' into 'master'.

This commit is contained in:
Simone Bordet 2016-02-12 16:01:38 +01:00
commit df80aef265
2 changed files with 332 additions and 337 deletions

View File

@ -23,19 +23,18 @@ import java.io.InputStream;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException; import java.nio.channels.AsynchronousCloseException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Response.Listener; import org.eclipse.jetty.client.api.Response.Listener;
import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
@ -63,8 +62,6 @@ import org.eclipse.jetty.util.log.Logger;
* <p> * <p>
* The {@link HttpClient} implementation (the producer) will feed the input stream * The {@link HttpClient} implementation (the producer) will feed the input stream
* asynchronously while the application (the consumer) is reading from it. * asynchronously while the application (the consumer) is reading from it.
* Chunks of content are maintained in a queue, and it is possible to specify a
* maximum buffer size for the bytes held in the queue, by default 16384 bytes.
* <p> * <p>
* If the consumer is faster than the producer, then the consumer will block * If the consumer is faster than the producer, then the consumer will block
* with the typical {@link InputStream#read()} semantic. * with the typical {@link InputStream#read()} semantic.
@ -74,137 +71,125 @@ import org.eclipse.jetty.util.log.Logger;
public class InputStreamResponseListener extends Listener.Adapter public class InputStreamResponseListener extends Listener.Adapter
{ {
private static final Logger LOG = Log.getLogger(InputStreamResponseListener.class); private static final Logger LOG = Log.getLogger(InputStreamResponseListener.class);
private static final byte[] EOF = new byte[0]; private static final DeferredContentProvider.Chunk EOF = new DeferredContentProvider.Chunk(BufferUtil.EMPTY_BUFFER, Callback.NOOP);
private static final byte[] CLOSED = new byte[0]; private final Object lock = this;
private static final byte[] FAILURE = new byte[0];
private final BlockingQueue<byte[]> queue = new LinkedBlockingQueue<>();
private final AtomicLong length = new AtomicLong();
private final CountDownLatch responseLatch = new CountDownLatch(1); private final CountDownLatch responseLatch = new CountDownLatch(1);
private final CountDownLatch resultLatch = new CountDownLatch(1); private final CountDownLatch resultLatch = new CountDownLatch(1);
private final AtomicReference<InputStream> stream = new AtomicReference<>(); private final AtomicReference<InputStream> stream = new AtomicReference<>();
private final long maxBufferSize;
private Response response; private Response response;
private Result result; private Result result;
private volatile Throwable failure; private Throwable failure;
private volatile boolean closed; private boolean closed;
private DeferredContentProvider.Chunk chunk;
public InputStreamResponseListener() public InputStreamResponseListener()
{ {
this(16 * 1024L);
}
public InputStreamResponseListener(long maxBufferSize)
{
this.maxBufferSize = maxBufferSize;
} }
@Override @Override
public void onHeaders(Response response) public void onHeaders(Response response)
{
synchronized (lock)
{ {
this.response = response; this.response = response;
responseLatch.countDown(); responseLatch.countDown();
} }
}
@Override @Override
public void onContent(Response response, ByteBuffer content) public void onContent(Response response, ByteBuffer content, Callback callback)
{ {
if (content.remaining() == 0)
{
if (LOG.isDebugEnabled())
LOG.debug("Skipped empty content {}", content);
callback.succeeded();
return;
}
boolean closed;
synchronized (lock)
{
closed = this.closed;
if (!closed) if (!closed)
{ {
int remaining = content.remaining(); chunk = new DeferredContentProvider.Chunk(content, callback);
if (remaining > 0) lock.notifyAll();
{ }
}
byte[] bytes = new byte[remaining]; if (closed)
content.get(bytes);
if (LOG.isDebugEnabled())
LOG.debug("Queuing {}/{} bytes", bytes, remaining);
queue.offer(bytes);
long newLength = length.addAndGet(remaining);
while (newLength >= maxBufferSize)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Queued bytes limit {}/{} exceeded, waiting", newLength, maxBufferSize); LOG.debug("InputStream closed, ignored content {}", content);
// Block to avoid infinite buffering callback.failed(new AsynchronousCloseException());
if (!await())
break;
newLength = length.get();
if (LOG.isDebugEnabled())
LOG.debug("Queued bytes limit {}/{} exceeded, woken up", newLength, maxBufferSize);
}
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Queuing skipped, empty content {}", content);
}
}
else
{
LOG.debug("Queuing skipped, stream already closed");
} }
} }
@Override @Override
public void onSuccess(Response response) public void onSuccess(Response response)
{ {
synchronized (lock)
{
chunk = EOF;
lock.notifyAll();
}
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Queuing end of content {}{}", EOF, ""); LOG.debug("End of content");
queue.offer(EOF);
signal();
} }
@Override @Override
public void onFailure(Response response, Throwable failure) public void onFailure(Response response, Throwable failure)
{ {
fail(failure); Callback callback = null;
signal(); synchronized (lock)
{
if (this.failure != null)
return;
this.failure = failure;
if (chunk != null)
callback = chunk.callback;
lock.notifyAll();
}
if (LOG.isDebugEnabled())
LOG.debug("Content failure", failure);
if (callback != null)
callback.failed(failure);
} }
@Override @Override
public void onComplete(Result result) public void onComplete(Result result)
{ {
if (result.isFailed() && failure == null) Throwable failure = result.getFailure();
fail(result.getFailure()); Callback callback = null;
synchronized (lock)
{
this.result = result; this.result = result;
resultLatch.countDown(); if (result.isFailed() && this.failure == null)
signal();
}
private void fail(Throwable failure)
{ {
if (LOG.isDebugEnabled())
LOG.debug("Queuing failure {} {}", FAILURE, failure);
queue.offer(FAILURE);
this.failure = failure; this.failure = failure;
if (chunk != null)
callback = chunk.callback;
}
// Notify the response latch in case of request failures.
responseLatch.countDown(); responseLatch.countDown();
resultLatch.countDown();
lock.notifyAll();
} }
protected boolean await() if (LOG.isDebugEnabled())
{ {
try if (failure == null)
{ LOG.debug("Result success");
synchronized (this) else
{ LOG.debug("Result failure", failure);
while (length.get() >= maxBufferSize && failure == null && !closed)
wait();
// Re-read the values as they may have changed while waiting.
return failure == null && !closed;
}
}
catch (InterruptedException x)
{
Thread.currentThread().interrupt();
return false;
}
} }
protected void signal() if (callback != null)
{ callback.failed(failure);
synchronized (this)
{
notifyAll();
}
} }
/** /**
@ -225,10 +210,14 @@ public class InputStreamResponseListener extends Listener.Adapter
boolean expired = !responseLatch.await(timeout, unit); boolean expired = !responseLatch.await(timeout, unit);
if (expired) if (expired)
throw new TimeoutException(); throw new TimeoutException();
if (failure != null) synchronized (lock)
{
// If the request failed there is no response.
if (response == null)
throw new ExecutionException(failure); throw new ExecutionException(failure);
return response; return response;
} }
}
/** /**
* Waits for the given timeout for the whole request/response cycle to be finished, * Waits for the given timeout for the whole request/response cycle to be finished,
@ -247,8 +236,11 @@ public class InputStreamResponseListener extends Listener.Adapter
boolean expired = !resultLatch.await(timeout, unit); boolean expired = !resultLatch.await(timeout, unit);
if (expired) if (expired)
throw new TimeoutException(); throw new TimeoutException();
synchronized (lock)
{
return result; return result;
} }
}
/** /**
* Returns an {@link InputStream} providing the response content bytes. * Returns an {@link InputStream} providing the response content bytes.
@ -267,65 +259,50 @@ public class InputStreamResponseListener extends Listener.Adapter
private class Input extends InputStream private class Input extends InputStream
{ {
private byte[] bytes;
private int index;
@Override @Override
public int read() throws IOException public int read() throws IOException
{ {
while (true) byte[] tmp = new byte[1];
{ int read = read(tmp);
if (bytes == EOF) if (read < 0)
{ return read;
// Mark the fact that we saw -1, return tmp[0] & 0xFF;
// so that in the close case we don't throw
index = -1;
return -1;
}
else if (bytes == FAILURE)
{
throw failure();
}
else if (bytes == CLOSED)
{
if (index < 0)
return -1;
throw new AsynchronousCloseException();
}
else if (bytes != null)
{
int result = bytes[index] & 0xFF;
if (++index == bytes.length)
{
length.addAndGet(-index);
bytes = null;
index = 0;
signal();
}
return result;
}
else
{
bytes = take();
if (LOG.isDebugEnabled())
LOG.debug("Dequeued {}/{} bytes", bytes, bytes.length);
}
}
} }
private IOException failure() @Override
{ public int read(byte[] b, int offset, int length) throws IOException
if (failure instanceof IOException)
return (IOException)failure;
else
return new IOException(failure);
}
private byte[] take() throws IOException
{ {
try try
{ {
return queue.take(); int result;
Callback callback = null;
synchronized (lock)
{
while (true)
{
if (failure != null)
throw toIOException(failure);
if (chunk == EOF)
return -1;
if (closed)
throw new AsynchronousCloseException();
if (chunk != null)
break;
lock.wait();
}
ByteBuffer buffer = chunk.buffer;
result = Math.min(buffer.remaining(), length);
buffer.get(b, offset, result);
if (!buffer.hasRemaining())
{
callback = chunk.callback;
chunk = null;
}
}
if (callback != null)
callback.succeeded();
return result;
} }
catch (InterruptedException x) catch (InterruptedException x)
{ {
@ -333,18 +310,35 @@ public class InputStreamResponseListener extends Listener.Adapter
} }
} }
private IOException toIOException(Throwable failure)
{
if (failure instanceof IOException)
return (IOException)failure;
else
return new IOException(failure);
}
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
if (!closed) Callback callback = null;
synchronized (lock)
{ {
super.close(); if (closed)
if (LOG.isDebugEnabled()) return;
LOG.debug("Queuing close {}{}", CLOSED, "");
queue.offer(CLOSED);
closed = true; closed = true;
signal(); if (chunk != null)
} callback = chunk.callback;
lock.notifyAll();
}
if (LOG.isDebugEnabled())
LOG.debug("InputStream close");
if (callback != null)
callback.failed(new AsynchronousCloseException());
super.close();
} }
} }
} }

View File

@ -18,9 +18,6 @@
package org.eclipse.jetty.client; package org.eclipse.jetty.client;
import static java.nio.file.StandardOpenOption.CREATE;
import static org.junit.Assert.fail;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
@ -33,6 +30,7 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.Random; import java.util.Random;
@ -51,7 +49,6 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BufferingResponseListener; import org.eclipse.jetty.client.util.BufferingResponseListener;
@ -60,6 +57,8 @@ import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.client.util.InputStreamContentProvider; import org.eclipse.jetty.client.util.InputStreamContentProvider;
import org.eclipse.jetty.client.util.InputStreamResponseListener; import org.eclipse.jetty.client.util.InputStreamResponseListener;
import org.eclipse.jetty.client.util.OutputStreamContentProvider; import org.eclipse.jetty.client.util.OutputStreamContentProvider;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils; import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.toolchain.test.annotation.Slow; import org.eclipse.jetty.toolchain.test.annotation.Slow;
@ -84,7 +83,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
Path targetTestsDir = MavenTestingUtils.getTargetTestingDir().toPath(); Path targetTestsDir = MavenTestingUtils.getTargetTestingDir().toPath();
Files.createDirectories(targetTestsDir); Files.createDirectories(targetTestsDir);
Path upload = Paths.get(targetTestsDir.toString(), "http_client_upload.big"); Path upload = Paths.get(targetTestsDir.toString(), "http_client_upload.big");
try (OutputStream output = Files.newOutputStream(upload, CREATE)) try (OutputStream output = Files.newOutputStream(upload, StandardOpenOption.CREATE))
{ {
byte[] kb = new byte[1024]; byte[] kb = new byte[1024];
for (int i = 0; i < 10 * 1024; ++i) for (int i = 0; i < 10 * 1024; ++i)
@ -97,14 +96,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
ContentResponse response = client.newRequest("localhost", connector.getLocalPort()) ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme) .scheme(scheme)
.file(upload) .file(upload)
.onRequestSuccess(new Request.SuccessListener() .onRequestSuccess(request -> requestTime.set(System.nanoTime()))
{
@Override
public void onSuccess(Request request)
{
requestTime.set(System.nanoTime());
}
})
.timeout(10, TimeUnit.SECONDS) .timeout(10, TimeUnit.SECONDS)
.send(); .send();
long responseTime = System.nanoTime(); long responseTime = System.nanoTime();
@ -242,10 +234,11 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
Thread.sleep(1); Thread.sleep(1);
++length; ++length;
} }
fail(); Assert.fail();
} }
catch (IOException expected) catch (IOException x)
{ {
// Expected.
} }
Assert.assertEquals(data.length, length); Assert.assertEquals(data.length, length);
@ -270,7 +263,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
InputStreamResponseListener listener = new InputStreamResponseListener(); InputStreamResponseListener listener = new InputStreamResponseListener();
InputStream stream = listener.getInputStream(); InputStream stream = listener.getInputStream();
// Close the stream immediately // Close the stream immediately.
stream.close(); stream.close();
client.newRequest("localhost", connector.getLocalPort()) client.newRequest("localhost", connector.getLocalPort())
@ -283,171 +276,161 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
stream.read(); // Throws stream.read(); // Throws
} }
@Test @Test(expected = AsynchronousCloseException.class)
public void testInputStreamResponseListenerClosedWhileWaiting() throws Exception public void testInputStreamResponseListenerClosedBeforeContent() throws Exception
{ {
final byte[] chunk1 = new byte[]{0, 1}; AtomicReference<AsyncContext> contextRef = new AtomicReference<>();
final byte[] chunk2 = new byte[]{2, 3};
final CountDownLatch closeLatch = new CountDownLatch(1);
start(new AbstractHandler() start(new AbstractHandler()
{ {
@Override @Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
contextRef.set(request.startAsync());
response.flushBuffer();
}
});
CountDownLatch latch = new CountDownLatch(1);
InputStreamResponseListener listener = new InputStreamResponseListener()
{
@Override
public void onContent(Response response, ByteBuffer content, Callback callback)
{
super.onContent(response, content, new Callback()
{
@Override
public void failed(Throwable x)
{
latch.countDown();
callback.failed(x);
}
});
}
};
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
InputStream input = listener.getInputStream();
input.close();
AsyncContext asyncContext = contextRef.get();
asyncContext.getResponse().getOutputStream().write(new byte[1024]);
asyncContext.complete();
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
input.read(); // Throws
}
@Test
public void testInputStreamResponseListenerClosedWhileWaiting() throws Exception
{
byte[] chunk1 = new byte[]{0, 1};
byte[] chunk2 = new byte[]{2, 3};
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{ {
baseRequest.setHandled(true); baseRequest.setHandled(true);
response.setContentLength(chunk1.length + chunk2.length); response.setContentLength(chunk1.length + chunk2.length);
ServletOutputStream output = response.getOutputStream(); ServletOutputStream output = response.getOutputStream();
output.write(chunk1); output.write(chunk1);
output.flush(); output.flush();
try
{
closeLatch.await(5, TimeUnit.SECONDS);
output.write(chunk2); output.write(chunk2);
output.flush();
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
} }
}); });
final CountDownLatch waitLatch = new CountDownLatch(1); CountDownLatch failedLatch = new CountDownLatch(1);
final CountDownLatch waitedLatch = new CountDownLatch(1); CountDownLatch contentLatch = new CountDownLatch(1);
InputStreamResponseListener listener = new InputStreamResponseListener(1) InputStreamResponseListener listener = new InputStreamResponseListener()
{ {
@Override @Override
protected boolean await() public void onContent(Response response, ByteBuffer content, Callback callback)
{ {
waitLatch.countDown(); super.onContent(response, content, new Callback()
boolean result = super.await(); {
waitedLatch.countDown(); @Override
return result; public void failed(Throwable x)
{
failedLatch.countDown();
callback.failed(x);
}
});
contentLatch.countDown();
} }
}; };
client.newRequest("localhost", connector.getLocalPort()) client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme) .scheme(scheme)
.send(listener); .send(listener);
Response response = listener.get(5, TimeUnit.SECONDS); Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
// Wait until we get some content.
Assert.assertTrue(contentLatch.await(5, TimeUnit.SECONDS));
// Close the stream.
InputStream stream = listener.getInputStream(); InputStream stream = listener.getInputStream();
// Wait until we block
Assert.assertTrue(waitLatch.await(5, TimeUnit.SECONDS));
// Close the stream
stream.close(); stream.close();
closeLatch.countDown();
// Be sure we're not stuck waiting // Make sure that the callback has been invoked.
Assert.assertTrue(waitedLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(failedLatch.await(5, TimeUnit.SECONDS));
} }
@Test @Test
public void testInputStreamResponseListenerFailedWhileWaiting() throws Exception public void testInputStreamResponseListenerFailedWhileWaiting() throws Exception
{ {
final byte[] chunk1 = new byte[]{0, 1};
final byte[] chunk2 = new byte[]{2, 3};
final CountDownLatch closeLatch = new CountDownLatch(1);
start(new AbstractHandler() start(new AbstractHandler()
{ {
@Override @Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{ {
baseRequest.setHandled(true); baseRequest.setHandled(true);
response.setContentLength(chunk1.length + chunk2.length); byte[] data = new byte[1024];
response.setContentLength(data.length);
ServletOutputStream output = response.getOutputStream(); ServletOutputStream output = response.getOutputStream();
output.write(chunk1); output.write(data);
output.flush();
try
{
closeLatch.await(5, TimeUnit.SECONDS);
output.write(chunk2);
output.flush();
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
} }
}); });
final CountDownLatch waitLatch = new CountDownLatch(1); CountDownLatch failedLatch = new CountDownLatch(1);
final CountDownLatch waitedLatch = new CountDownLatch(1); CountDownLatch contentLatch = new CountDownLatch(1);
InputStreamResponseListener listener = new InputStreamResponseListener(1) InputStreamResponseListener listener = new InputStreamResponseListener()
{ {
@Override @Override
protected boolean await() public void onContent(Response response, ByteBuffer content, Callback callback)
{ {
waitLatch.countDown(); super.onContent(response, content, new Callback()
boolean result = super.await(); {
waitedLatch.countDown(); @Override
return result; public void failed(Throwable x)
{
failedLatch.countDown();
callback.failed(x);
}
});
contentLatch.countDown();
} }
}; };
client.newRequest("localhost", connector.getLocalPort()) client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme) .scheme(scheme)
.send(listener); .send(listener);
Response response = listener.get(5, TimeUnit.SECONDS); Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
// Wait until we block // Wait until we get some content.
Assert.assertTrue(waitLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(contentLatch.await(5, TimeUnit.SECONDS));
// Fail the response
// Abort the response.
response.abort(new Exception()); response.abort(new Exception());
closeLatch.countDown();
// Be sure we're not stuck waiting // Make sure that the callback has been invoked.
Assert.assertTrue(waitedLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(failedLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testInputStreamResponseListenerConsumingBeforeWaiting() throws Exception
{
final byte[] data = new byte[]{0, 1};
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.setContentLength(data.length);
ServletOutputStream output = response.getOutputStream();
output.write(data);
output.flush();
}
});
final AtomicReference<Throwable> failure = new AtomicReference<>();
InputStreamResponseListener listener = new InputStreamResponseListener(1)
{
@Override
protected boolean await()
{
// Consume everything just before waiting
InputStream stream = getInputStream();
consume(stream, data);
return super.await();
}
private void consume(InputStream stream, byte[] data)
{
try
{
for (byte datum : data)
Assert.assertEquals(datum, stream.read());
}
catch (IOException x)
{
failure.compareAndSet(null, x);
}
}
};
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(listener);
Result result = listener.await(5, TimeUnit.SECONDS);
Assert.assertEquals(200, result.getResponse().getStatus());
Assert.assertNull(failure.get());
} }
@Test @Test
@ -649,14 +632,10 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
client.newRequest("localhost", connector.getLocalPort()) client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme) .scheme(scheme)
.content(content) .content(content)
.send(new Response.CompleteListener() .send(result ->
{
@Override
public void onComplete(Result result)
{ {
if (result.isSucceeded() && result.getResponse().getStatus() == 200) if (result.isSucceeded() && result.getResponse().getStatus() == 200)
latch.countDown(); latch.countDown();
}
}); });
// Make sure we provide the content *after* the request has been "sent". // Make sure we provide the content *after* the request has been "sent".
@ -703,14 +682,10 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
client.newRequest("localhost", connector.getLocalPort()) client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme) .scheme(scheme)
.content(content) .content(content)
.send(new Response.CompleteListener() .send(result ->
{
@Override
public void onComplete(Result result)
{ {
if (result.isSucceeded() && result.getResponse().getStatus() == 200) if (result.isSucceeded() && result.getResponse().getStatus() == 200)
latch.countDown(); latch.countDown();
}
}); });
} }
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
@ -947,14 +922,10 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
client.newRequest("0.0.0.1", connector.getLocalPort()) client.newRequest("0.0.0.1", connector.getLocalPort())
.scheme(scheme) .scheme(scheme)
.content(content) .content(content)
.send(new Response.CompleteListener() .send(result ->
{
@Override
public void onComplete(Result result)
{ {
if (result.isFailed()) if (result.isFailed())
latch.countDown(); latch.countDown();
}
}); });
try (OutputStream output = content.getOutputStream()) try (OutputStream output = content.getOutputStream())
@ -990,24 +961,16 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
client.newRequest("localhost", connector.getLocalPort()) client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme) .scheme(scheme)
.content(content) .content(content)
.onRequestBegin(new Request.BeginListener() .onRequestBegin(request ->
{
@Override
public void onBegin(Request request)
{ {
content.offer(ByteBuffer.wrap(new byte[256]), callback); content.offer(ByteBuffer.wrap(new byte[256]), callback);
content.offer(ByteBuffer.wrap(new byte[256]), callback); content.offer(ByteBuffer.wrap(new byte[256]), callback);
request.abort(new Exception("explicitly_thrown_by_test")); request.abort(new Exception("explicitly_thrown_by_test"));
}
}) })
.send(new Response.CompleteListener() .send(result ->
{
@Override
public void onComplete(Result result)
{ {
if (result.isFailed()) if (result.isFailed())
completeLatch.countDown(); completeLatch.countDown();
}
}); });
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(failLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(failLatch.await(5, TimeUnit.SECONDS));
@ -1046,14 +1009,10 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
client.newRequest("0.0.0.1", connector.getLocalPort()) client.newRequest("0.0.0.1", connector.getLocalPort())
.scheme(scheme) .scheme(scheme)
.content(content) .content(content)
.send(new Response.CompleteListener() .send(result ->
{
@Override
public void onComplete(Result result)
{ {
Assert.assertTrue(result.isFailed()); Assert.assertTrue(result.isFailed());
completeLatch.countDown(); completeLatch.countDown();
}
}); });
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
@ -1122,25 +1081,67 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
client.newRequest("localhost", connector.getLocalPort()) client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme) .scheme(scheme)
.content(provider) .content(provider)
.onRequestCommit(new Request.CommitListener() .onRequestCommit(request -> commit.set(true))
{ .send(result ->
@Override
public void onCommit(Request request)
{
commit.set(true);
}
})
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{ {
Assert.assertTrue(result.isFailed()); Assert.assertTrue(result.isFailed());
completeLatch.countDown(); completeLatch.countDown();
}
}); });
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(closeLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
} }
@Test
public void testInputStreamResponseListenerBufferedRead() throws Exception
{
AtomicReference<AsyncContext> asyncContextRef = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
asyncContextRef.set(request.startAsync());
latch.countDown();
}
});
InputStreamResponseListener listener = new InputStreamResponseListener();
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.timeout(5, TimeUnit.SECONDS)
.send(listener);
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
AsyncContext asyncContext = asyncContextRef.get();
Assert.assertNotNull(asyncContext);
Random random = new Random();
byte[] chunk = new byte[64];
random.nextBytes(chunk);
ServletOutputStream output = asyncContext.getResponse().getOutputStream();
output.write(chunk);
output.flush();
// Use a buffer larger than the data
// written to test that the read returns.
byte[] buffer = new byte[2 * chunk.length];
InputStream stream = listener.getInputStream();
int totalRead = 0;
while (totalRead < chunk.length)
{
int read = stream.read(buffer);
Assert.assertTrue(read > 0);
totalRead += read;
}
asyncContext.complete();
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(200, response.getStatus());
}
} }