402397 - InputStreamResponseListener early close inputStream cause hold lock.

This commit is contained in:
Simone Bordet 2013-03-06 17:03:43 +01:00
parent 50ab606df4
commit 37fd36b31c
2 changed files with 201 additions and 6 deletions

View File

@ -30,10 +30,12 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
@ -78,10 +80,12 @@ public class InputStreamResponseListener extends Response.Listener.Empty
private final AtomicLong length = new AtomicLong(); private final AtomicLong length = new AtomicLong();
private final CountDownLatch responseLatch = new CountDownLatch(1); private final CountDownLatch responseLatch = new CountDownLatch(1);
private final CountDownLatch resultLatch = new CountDownLatch(1); private final CountDownLatch resultLatch = new CountDownLatch(1);
private final AtomicReference<InputStream> stream = new AtomicReference<>();
private final long maxBufferSize; private final long maxBufferSize;
private Response response; private Response response;
private Result result; private Result result;
private volatile Throwable failure; private volatile Throwable failure;
private volatile boolean closed;
public InputStreamResponseListener() public InputStreamResponseListener()
{ {
@ -113,6 +117,7 @@ public class InputStreamResponseListener extends Response.Listener.Empty
while (newLength >= maxBufferSize) while (newLength >= maxBufferSize)
{ {
LOG.debug("Queued bytes limit {}/{} exceeded, waiting", newLength, maxBufferSize); LOG.debug("Queued bytes limit {}/{} exceeded, waiting", newLength, maxBufferSize);
// Block to avoid infinite buffering
if (!await()) if (!await())
break; break;
newLength = length.get(); newLength = length.get();
@ -123,10 +128,12 @@ public class InputStreamResponseListener extends Response.Listener.Empty
@Override @Override
public void onFailure(Response response, Throwable failure) public void onFailure(Response response, Throwable failure)
{ {
this.failure = failure;
LOG.debug("Queuing failure {} {}", FAILURE, failure); LOG.debug("Queuing failure {} {}", FAILURE, failure);
queue.offer(FAILURE); queue.offer(FAILURE);
responseLatch.countDown(); responseLatch.countDown();
resultLatch.countDown();
this.failure = failure;
signal();
} }
@Override @Override
@ -143,15 +150,18 @@ public class InputStreamResponseListener extends Response.Listener.Empty
resultLatch.countDown(); resultLatch.countDown();
} }
private boolean await() protected boolean await()
{ {
try try
{ {
synchronized (this) synchronized (this)
{ {
wait(); if (failure == null && !closed)
wait();
// Re-read the values for the return value
// as they may have changed while waiting.
return failure == null && !closed;
} }
return true;
} }
catch (InterruptedException x) catch (InterruptedException x)
{ {
@ -159,7 +169,7 @@ public class InputStreamResponseListener extends Response.Listener.Empty
} }
} }
private void signal() protected void signal()
{ {
synchronized (this) synchronized (this)
{ {
@ -167,6 +177,19 @@ public class InputStreamResponseListener extends Response.Listener.Empty
} }
} }
/**
* Waits for the given timeout for the response to be available, then returns it.
* <p />
* The wait ends as soon as all the HTTP headers have been received, without waiting for the content.
* To wait for the whole content, see {@link #await(long, TimeUnit)}.
*
* @param timeout the time to wait
* @param unit the timeout unit
* @return the response
* @throws InterruptedException if the thread is interrupted
* @throws TimeoutException if the timeout expires
* @throws ExecutionException if a failure happened
*/
public Response get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException public Response get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException
{ {
boolean expired = !responseLatch.await(timeout, unit); boolean expired = !responseLatch.await(timeout, unit);
@ -177,6 +200,18 @@ public class InputStreamResponseListener extends Response.Listener.Empty
return response; return response;
} }
/**
* Waits for the given timeout for the whole request/response cycle to be finished,
* then returns the corresponding result.
* <p />
*
* @param timeout the time to wait
* @param unit the timeout unit
* @return the result
* @throws InterruptedException if the thread is interrupted
* @throws TimeoutException if the timeout expires
* @see #get(long, TimeUnit)
*/
public Result await(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException public Result await(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
{ {
boolean expired = !resultLatch.await(timeout, unit); boolean expired = !resultLatch.await(timeout, unit);
@ -185,9 +220,19 @@ public class InputStreamResponseListener extends Response.Listener.Empty
return result; return result;
} }
/**
* Returns an {@link InputStream} providing the response content bytes.
* <p />
* The method may be invoked only once; subsequent invocations will return a closed {@link InputStream}.
*
* @return an input stream providing the response content
*/
public InputStream getInputStream() public InputStream getInputStream()
{ {
return new Input(); InputStream result = new Input();
if (stream.compareAndSet(null, result))
return result;
return IO.getClosedStream();
} }
private class Input extends InputStream private class Input extends InputStream
@ -259,6 +304,8 @@ public class InputStreamResponseListener extends Response.Listener.Empty
{ {
LOG.debug("Queuing close {}{}", CLOSE, ""); LOG.debug("Queuing close {}{}", CLOSE, "");
queue.offer(CLOSE); queue.offer(CLOSE);
closed = true;
signal();
super.close(); super.close();
} }
} }

View File

@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
@ -45,6 +46,7 @@ import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BufferingResponseListener; import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.client.util.DeferredContentProvider; import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.client.util.InputStreamResponseListener; import org.eclipse.jetty.client.util.InputStreamResponseListener;
import org.eclipse.jetty.client.util.OutputStreamContentProvider; import org.eclipse.jetty.client.util.OutputStreamContentProvider;
@ -247,6 +249,152 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
Assert.assertTrue(result.isFailed()); Assert.assertTrue(result.isFailed());
} }
@Test(expected = AsynchronousCloseException.class)
public void testInputStreamResponseListenerClosedBeforeReading() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
IO.copy(request.getInputStream(), response.getOutputStream());
}
});
InputStreamResponseListener listener = new InputStreamResponseListener();
InputStream stream = listener.getInputStream();
// Close the stream immediately
stream.close();
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.content(new BytesContentProvider(new byte[]{0, 1, 2, 3}))
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(200, response.getStatus());
stream.read();
}
@Test
public void testInputStreamResponseListenerClosedWhileWaiting() throws Exception
{
final byte[] chunk1 = new byte[]{0, 1};
final byte[] chunk2 = new byte[]{2, 3};
final CountDownLatch closeLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.setContentLength(chunk1.length + chunk2.length);
ServletOutputStream output = response.getOutputStream();
output.write(chunk1);
output.flush();
try
{
closeLatch.await(5, TimeUnit.SECONDS);
output.write(chunk2);
output.flush();
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
}
});
final CountDownLatch waitLatch = new CountDownLatch(1);
final CountDownLatch waitedLatch = new CountDownLatch(1);
InputStreamResponseListener listener = new InputStreamResponseListener(1)
{
@Override
protected boolean await()
{
waitLatch.countDown();
boolean result = super.await();
waitedLatch.countDown();
return result;
}
};
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(200, response.getStatus());
InputStream stream = listener.getInputStream();
// Wait until we block
Assert.assertTrue(waitLatch.await(5, TimeUnit.SECONDS));
// Close the stream
stream.close();
closeLatch.countDown();
// Be sure we're not stuck waiting
Assert.assertTrue(waitedLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testInputStreamResponseListenerFailedWhileWaiting() throws Exception
{
final byte[] chunk1 = new byte[]{0, 1};
final byte[] chunk2 = new byte[]{2, 3};
final CountDownLatch closeLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.setContentLength(chunk1.length + chunk2.length);
ServletOutputStream output = response.getOutputStream();
output.write(chunk1);
output.flush();
try
{
closeLatch.await(5, TimeUnit.SECONDS);
output.write(chunk2);
output.flush();
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
}
});
final CountDownLatch waitLatch = new CountDownLatch(1);
final CountDownLatch waitedLatch = new CountDownLatch(1);
InputStreamResponseListener listener = new InputStreamResponseListener(1)
{
@Override
protected boolean await()
{
waitLatch.countDown();
boolean result = super.await();
waitedLatch.countDown();
return result;
}
};
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(200, response.getStatus());
InputStream stream = listener.getInputStream();
// Wait until we block
Assert.assertTrue(waitLatch.await(5, TimeUnit.SECONDS));
// Fail the response
response.abort(new Exception());
closeLatch.countDown();
// Be sure we're not stuck waiting
Assert.assertTrue(waitedLatch.await(5, TimeUnit.SECONDS));
}
@Test(expected = AsynchronousCloseException.class) @Test(expected = AsynchronousCloseException.class)
public void testDownloadWithCloseBeforeContent() throws Exception public void testDownloadWithCloseBeforeContent() throws Exception
{ {