484446 - InputStreamResponseListener's InputStream uses default read (3) and blocks early on never-ending response.

Implemented read(byte[],int.int) to fix the reported issue.
Reworked InputStreamResponseListener to use a callback approach
rather than blocking waiting for content.
This commit is contained in:
Simone Bordet 2016-02-12 11:19:31 +01:00
parent 9c075ff85c
commit 7c7c49f06b
2 changed files with 312 additions and 266 deletions

View File

@ -23,19 +23,18 @@ import java.io.InputStream;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Response.Listener;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -63,8 +62,6 @@ import org.eclipse.jetty.util.log.Logger;
* <p>
* The {@link HttpClient} implementation (the producer) will feed the input stream
* asynchronously while the application (the consumer) is reading from it.
* Chunks of content are maintained in a queue, and it is possible to specify a
* maximum buffer size for the bytes held in the queue, by default 16384 bytes.
* <p>
* If the consumer is faster than the producer, then the consumer will block
* with the typical {@link InputStream#read()} semantic.
@ -74,137 +71,133 @@ import org.eclipse.jetty.util.log.Logger;
public class InputStreamResponseListener extends Listener.Adapter
{
private static final Logger LOG = Log.getLogger(InputStreamResponseListener.class);
private static final byte[] EOF = new byte[0];
private static final byte[] CLOSED = new byte[0];
private static final byte[] FAILURE = new byte[0];
private final BlockingQueue<byte[]> queue = new LinkedBlockingQueue<>();
private final AtomicLong length = new AtomicLong();
private static final DeferredContentProvider.Chunk EOF = new DeferredContentProvider.Chunk(BufferUtil.EMPTY_BUFFER, Callback.NOOP);
private final Object lock = this;
private final CountDownLatch responseLatch = new CountDownLatch(1);
private final CountDownLatch resultLatch = new CountDownLatch(1);
private final AtomicReference<InputStream> stream = new AtomicReference<>();
private final long maxBufferSize;
private Response response;
private Result result;
private volatile Throwable failure;
private volatile boolean closed;
private Throwable failure;
private boolean closed;
private DeferredContentProvider.Chunk chunk;
public InputStreamResponseListener()
{
this(16 * 1024L);
}
/**
* @deprecated response content is not buffered anymore, but handled asynchronously.
*/
@Deprecated
public InputStreamResponseListener(long maxBufferSize)
{
this.maxBufferSize = maxBufferSize;
}
@Override
public void onHeaders(Response response)
{
this.response = response;
responseLatch.countDown();
synchronized (lock)
{
this.response = response;
responseLatch.countDown();
}
}
@Override
public void onContent(Response response, ByteBuffer content)
public void onContent(Response response, ByteBuffer content, Callback callback)
{
if (!closed)
if (content.remaining() == 0)
{
int remaining = content.remaining();
if (remaining > 0)
{
if (LOG.isDebugEnabled())
LOG.debug("Skipped empty content {}", content);
callback.succeeded();
return;
}
byte[] bytes = new byte[remaining];
content.get(bytes);
if (LOG.isDebugEnabled())
LOG.debug("Queuing {}/{} bytes", bytes, remaining);
queue.offer(bytes);
long newLength = length.addAndGet(remaining);
while (newLength >= maxBufferSize)
{
if (LOG.isDebugEnabled())
LOG.debug("Queued bytes limit {}/{} exceeded, waiting", newLength, maxBufferSize);
// Block to avoid infinite buffering
if (!await())
break;
newLength = length.get();
if (LOG.isDebugEnabled())
LOG.debug("Queued bytes limit {}/{} exceeded, woken up", newLength, maxBufferSize);
}
}
else
boolean closed;
synchronized (lock)
{
closed = this.closed;
if (!closed)
{
if (LOG.isDebugEnabled())
LOG.debug("Queuing skipped, empty content {}", content);
chunk = new DeferredContentProvider.Chunk(content, callback);
lock.notifyAll();
}
}
else
if (closed)
{
LOG.debug("Queuing skipped, stream already closed");
if (LOG.isDebugEnabled())
LOG.debug("InputStream closed, ignored content {}", content);
callback.failed(new AsynchronousCloseException());
}
}
@Override
public void onSuccess(Response response)
{
synchronized (lock)
{
chunk = EOF;
lock.notifyAll();
}
if (LOG.isDebugEnabled())
LOG.debug("Queuing end of content {}{}", EOF, "");
queue.offer(EOF);
signal();
LOG.debug("End of content");
}
@Override
public void onFailure(Response response, Throwable failure)
{
fail(failure);
signal();
Callback callback = null;
synchronized (lock)
{
if (this.failure != null)
return;
this.failure = failure;
if (chunk != null)
callback = chunk.callback;
lock.notifyAll();
}
if (LOG.isDebugEnabled())
LOG.debug("Content failure", failure);
if (callback != null)
callback.failed(failure);
}
@Override
public void onComplete(Result result)
{
if (result.isFailed() && failure == null)
fail(result.getFailure());
this.result = result;
resultLatch.countDown();
signal();
}
private void fail(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("Queuing failure {} {}", FAILURE, failure);
queue.offer(FAILURE);
this.failure = failure;
responseLatch.countDown();
}
protected boolean await()
{
try
Throwable failure = result.getFailure();
Callback callback = null;
synchronized (lock)
{
synchronized (this)
this.result = result;
if (result.isFailed() && this.failure == null)
{
while (length.get() >= maxBufferSize && failure == null && !closed)
wait();
// Re-read the values as they may have changed while waiting.
return failure == null && !closed;
this.failure = failure;
if (chunk != null)
callback = chunk.callback;
}
// Notify the response latch in case of request failures.
responseLatch.countDown();
resultLatch.countDown();
lock.notifyAll();
}
catch (InterruptedException x)
{
Thread.currentThread().interrupt();
return false;
}
}
protected void signal()
{
synchronized (this)
if (LOG.isDebugEnabled())
{
notifyAll();
if (failure == null)
LOG.debug("Result success");
else
LOG.debug("Result failure", failure);
}
if (callback != null)
callback.failed(failure);
}
/**
@ -225,9 +218,13 @@ public class InputStreamResponseListener extends Listener.Adapter
boolean expired = !responseLatch.await(timeout, unit);
if (expired)
throw new TimeoutException();
if (failure != null)
throw new ExecutionException(failure);
return response;
synchronized (lock)
{
// If the request failed there is no response.
if (response == null)
throw new ExecutionException(failure);
return response;
}
}
/**
@ -247,7 +244,10 @@ public class InputStreamResponseListener extends Listener.Adapter
boolean expired = !resultLatch.await(timeout, unit);
if (expired)
throw new TimeoutException();
return result;
synchronized (lock)
{
return result;
}
}
/**
@ -267,65 +267,50 @@ public class InputStreamResponseListener extends Listener.Adapter
private class Input extends InputStream
{
private byte[] bytes;
private int index;
@Override
public int read() throws IOException
{
while (true)
{
if (bytes == EOF)
{
// Mark the fact that we saw -1,
// so that in the close case we don't throw
index = -1;
return -1;
}
else if (bytes == FAILURE)
{
throw failure();
}
else if (bytes == CLOSED)
{
if (index < 0)
return -1;
throw new AsynchronousCloseException();
}
else if (bytes != null)
{
int result = bytes[index] & 0xFF;
if (++index == bytes.length)
{
length.addAndGet(-index);
bytes = null;
index = 0;
signal();
}
return result;
}
else
{
bytes = take();
if (LOG.isDebugEnabled())
LOG.debug("Dequeued {}/{} bytes", bytes, bytes.length);
}
}
byte[] tmp = new byte[1];
int read = read(tmp);
if (read < 0)
return read;
return tmp[0] & 0xFF;
}
private IOException failure()
{
if (failure instanceof IOException)
return (IOException)failure;
else
return new IOException(failure);
}
private byte[] take() throws IOException
@Override
public int read(byte[] b, int offset, int length) throws IOException
{
try
{
return queue.take();
int result;
Callback callback = null;
synchronized (lock)
{
while (true)
{
if (failure != null)
throw toIOException(failure);
if (chunk == EOF)
return -1;
if (closed)
throw new AsynchronousCloseException();
if (chunk != null)
break;
lock.wait();
}
ByteBuffer buffer = chunk.buffer;
result = Math.min(buffer.remaining(), length);
buffer.get(b, offset, result);
if (!buffer.hasRemaining())
{
callback = chunk.callback;
chunk = null;
}
}
if (callback != null)
callback.succeeded();
return result;
}
catch (InterruptedException x)
{
@ -333,18 +318,35 @@ public class InputStreamResponseListener extends Listener.Adapter
}
}
private IOException toIOException(Throwable failure)
{
if (failure instanceof IOException)
return (IOException)failure;
else
return new IOException(failure);
}
@Override
public void close() throws IOException
{
if (!closed)
Callback callback = null;
synchronized (lock)
{
super.close();
if (LOG.isDebugEnabled())
LOG.debug("Queuing close {}{}", CLOSED, "");
queue.offer(CLOSED);
if (closed)
return;
closed = true;
signal();
if (chunk != null)
callback = chunk.callback;
lock.notifyAll();
}
if (LOG.isDebugEnabled())
LOG.debug("InputStream close");
if (callback != null)
callback.failed(new AsynchronousCloseException());
super.close();
}
}
}

View File

@ -30,6 +30,7 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Random;
@ -56,6 +57,8 @@ import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.client.util.InputStreamContentProvider;
import org.eclipse.jetty.client.util.InputStreamResponseListener;
import org.eclipse.jetty.client.util.OutputStreamContentProvider;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
@ -66,9 +69,6 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.Assert;
import org.junit.Test;
import static java.nio.file.StandardOpenOption.CREATE;
import static org.junit.Assert.fail;
public class HttpClientStreamTest extends AbstractHttpClientServerTest
{
public HttpClientStreamTest(SslContextFactory sslContextFactory)
@ -83,7 +83,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
Path targetTestsDir = MavenTestingUtils.getTargetTestingDir().toPath();
Files.createDirectories(targetTestsDir);
Path upload = Paths.get(targetTestsDir.toString(), "http_client_upload.big");
try (OutputStream output = Files.newOutputStream(upload, CREATE))
try (OutputStream output = Files.newOutputStream(upload, StandardOpenOption.CREATE))
{
byte[] kb = new byte[1024];
for (int i = 0; i < 10 * 1024; ++i)
@ -234,10 +234,11 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
Thread.sleep(1);
++length;
}
fail();
Assert.fail();
}
catch (IOException expected)
catch (IOException x)
{
// Expected.
}
Assert.assertEquals(data.length, length);
@ -262,7 +263,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
InputStreamResponseListener listener = new InputStreamResponseListener();
InputStream stream = listener.getInputStream();
// Close the stream immediately
// Close the stream immediately.
stream.close();
client.newRequest("localhost", connector.getLocalPort())
@ -275,171 +276,161 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
stream.read(); // Throws
}
@Test
public void testInputStreamResponseListenerClosedWhileWaiting() throws Exception
@Test(expected = AsynchronousCloseException.class)
public void testInputStreamResponseListenerClosedBeforeContent() throws Exception
{
final byte[] chunk1 = new byte[]{0, 1};
final byte[] chunk2 = new byte[]{2, 3};
final CountDownLatch closeLatch = new CountDownLatch(1);
AtomicReference<AsyncContext> contextRef = new AtomicReference<>();
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
contextRef.set(request.startAsync());
response.flushBuffer();
}
});
CountDownLatch latch = new CountDownLatch(1);
InputStreamResponseListener listener = new InputStreamResponseListener()
{
@Override
public void onContent(Response response, ByteBuffer content, Callback callback)
{
super.onContent(response, content, new Callback()
{
@Override
public void failed(Throwable x)
{
latch.countDown();
callback.failed(x);
}
});
}
};
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
InputStream input = listener.getInputStream();
input.close();
AsyncContext asyncContext = contextRef.get();
asyncContext.getResponse().getOutputStream().write(new byte[1024]);
asyncContext.complete();
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
input.read(); // Throws
}
@Test
public void testInputStreamResponseListenerClosedWhileWaiting() throws Exception
{
byte[] chunk1 = new byte[]{0, 1};
byte[] chunk2 = new byte[]{2, 3};
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.setContentLength(chunk1.length + chunk2.length);
ServletOutputStream output = response.getOutputStream();
output.write(chunk1);
output.flush();
try
{
closeLatch.await(5, TimeUnit.SECONDS);
output.write(chunk2);
output.flush();
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
output.write(chunk2);
}
});
final CountDownLatch waitLatch = new CountDownLatch(1);
final CountDownLatch waitedLatch = new CountDownLatch(1);
InputStreamResponseListener listener = new InputStreamResponseListener(1)
CountDownLatch failedLatch = new CountDownLatch(1);
CountDownLatch contentLatch = new CountDownLatch(1);
InputStreamResponseListener listener = new InputStreamResponseListener()
{
@Override
protected boolean await()
public void onContent(Response response, ByteBuffer content, Callback callback)
{
waitLatch.countDown();
boolean result = super.await();
waitedLatch.countDown();
return result;
super.onContent(response, content, new Callback()
{
@Override
public void failed(Throwable x)
{
failedLatch.countDown();
callback.failed(x);
}
});
contentLatch.countDown();
}
};
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
// Wait until we get some content.
Assert.assertTrue(contentLatch.await(5, TimeUnit.SECONDS));
// Close the stream.
InputStream stream = listener.getInputStream();
// Wait until we block
Assert.assertTrue(waitLatch.await(5, TimeUnit.SECONDS));
// Close the stream
stream.close();
closeLatch.countDown();
// Be sure we're not stuck waiting
Assert.assertTrue(waitedLatch.await(5, TimeUnit.SECONDS));
// Make sure that the callback has been invoked.
Assert.assertTrue(failedLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testInputStreamResponseListenerFailedWhileWaiting() throws Exception
{
final byte[] chunk1 = new byte[]{0, 1};
final byte[] chunk2 = new byte[]{2, 3};
final CountDownLatch closeLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.setContentLength(chunk1.length + chunk2.length);
byte[] data = new byte[1024];
response.setContentLength(data.length);
ServletOutputStream output = response.getOutputStream();
output.write(chunk1);
output.flush();
try
{
closeLatch.await(5, TimeUnit.SECONDS);
output.write(chunk2);
output.flush();
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
output.write(data);
}
});
final CountDownLatch waitLatch = new CountDownLatch(1);
final CountDownLatch waitedLatch = new CountDownLatch(1);
InputStreamResponseListener listener = new InputStreamResponseListener(1)
CountDownLatch failedLatch = new CountDownLatch(1);
CountDownLatch contentLatch = new CountDownLatch(1);
InputStreamResponseListener listener = new InputStreamResponseListener()
{
@Override
protected boolean await()
public void onContent(Response response, ByteBuffer content, Callback callback)
{
waitLatch.countDown();
boolean result = super.await();
waitedLatch.countDown();
return result;
super.onContent(response, content, new Callback()
{
@Override
public void failed(Throwable x)
{
failedLatch.countDown();
callback.failed(x);
}
});
contentLatch.countDown();
}
};
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
// Wait until we block
Assert.assertTrue(waitLatch.await(5, TimeUnit.SECONDS));
// Fail the response
// Wait until we get some content.
Assert.assertTrue(contentLatch.await(5, TimeUnit.SECONDS));
// Abort the response.
response.abort(new Exception());
closeLatch.countDown();
// Be sure we're not stuck waiting
Assert.assertTrue(waitedLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testInputStreamResponseListenerConsumingBeforeWaiting() throws Exception
{
final byte[] data = new byte[]{0, 1};
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.setContentLength(data.length);
ServletOutputStream output = response.getOutputStream();
output.write(data);
output.flush();
}
});
final AtomicReference<Throwable> failure = new AtomicReference<>();
InputStreamResponseListener listener = new InputStreamResponseListener(1)
{
@Override
protected boolean await()
{
// Consume everything just before waiting
InputStream stream = getInputStream();
consume(stream, data);
return super.await();
}
private void consume(InputStream stream, byte[] data)
{
try
{
for (byte datum : data)
Assert.assertEquals(datum, stream.read());
}
catch (IOException x)
{
failure.compareAndSet(null, x);
}
}
};
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(listener);
Result result = listener.await(5, TimeUnit.SECONDS);
Assert.assertEquals(200, result.getResponse().getStatus());
Assert.assertNull(failure.get());
// Make sure that the callback has been invoked.
Assert.assertTrue(failedLatch.await(5, TimeUnit.SECONDS));
}
@Test
@ -1100,4 +1091,57 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testInputStreamResponseListenerBufferedRead() throws Exception
{
AtomicReference<AsyncContext> asyncContextRef = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
asyncContextRef.set(request.startAsync());
latch.countDown();
}
});
InputStreamResponseListener listener = new InputStreamResponseListener();
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.timeout(5, TimeUnit.SECONDS)
.send(listener);
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
AsyncContext asyncContext = asyncContextRef.get();
Assert.assertNotNull(asyncContext);
Random random = new Random();
byte[] chunk = new byte[64];
random.nextBytes(chunk);
ServletOutputStream output = asyncContext.getResponse().getOutputStream();
output.write(chunk);
output.flush();
// Use a buffer larger than the data
// written to test that the read returns.
byte[] buffer = new byte[2 * chunk.length];
InputStream stream = listener.getInputStream();
int totalRead = 0;
while (totalRead < chunk.length)
{
int read = stream.read(buffer);
Assert.assertTrue(read > 0);
totalRead += read;
}
asyncContext.complete();
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(200, response.getStatus());
}
}