jetty-9 - HTTP client: implemented InputStreamResponseListener.

This commit is contained in:
Simone Bordet 2012-09-17 19:15:49 +02:00
parent 776aea91d4
commit bd80a64cb9
2 changed files with 172 additions and 8 deletions

View File

@ -20,27 +20,38 @@ package org.eclipse.jetty.client.util;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class InputStreamResponseListener extends Response.Listener.Empty
{
public static final Logger LOG = Log.getLogger(InputStreamResponseListener.class);
private static final byte[] EOF = new byte[0];
private static final byte[] FAILURE = new byte[0];
private final BlockingQueue<byte[]> queue = new LinkedBlockingQueue<>();
private final AtomicLong length = new AtomicLong();
private final CountDownLatch responseLatch = new CountDownLatch(1);
private final CountDownLatch resultLatch = new CountDownLatch(1);
private final long capacity;
private Response response;
private Result result;
private volatile Throwable failure;
public InputStreamResponseListener()
{
this(1024 * 1024L);
this(16 * 1024L);
}
public InputStreamResponseListener(long capacity)
@ -61,20 +72,67 @@ public class InputStreamResponseListener extends Response.Listener.Empty
int remaining = content.remaining();
byte[] bytes = new byte[remaining];
content.get(bytes);
LOG.debug("Queued {}/{} bytes", bytes, bytes.length);
queue.offer(bytes);
long newLength = length.addAndGet(remaining);
if (newLength > capacity)
// wait
;
while (newLength >= capacity)
{
LOG.debug("Queued bytes limit {}/{} exceeded, waiting", newLength, capacity);
if (!await())
break;
newLength = length.get();
LOG.debug("Queued bytes limit {}/{} exceeded, woken up", newLength, capacity);
}
}
@Override
public void onFailure(Response response, Throwable failure)
{
this.failure = failure;
queue.offer(FAILURE);
LOG.debug("Queued failure {} {}", FAILURE, failure);
responseLatch.countDown();
}
@Override
public void onSuccess(Response response)
{
queue.offer(EOF);
LOG.debug("Queued end of content {}{}", EOF, "");
}
@Override
public void onComplete(Result result)
{
this.result = result;
resultLatch.countDown();
}
private boolean await()
{
try
{
synchronized (this)
{
wait();
}
return true;
}
catch (InterruptedException x)
{
return false;
}
}
private void signal()
{
synchronized (this)
{
notify();
}
}
public Response get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException
{
boolean expired = !responseLatch.await(timeout, unit);
@ -85,9 +143,12 @@ public class InputStreamResponseListener extends Response.Listener.Empty
return response;
}
public Result await(long timeout, TimeUnit seconds)
public Result await(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
{
return null;
boolean expired = !resultLatch.await(timeout, unit);
if (expired)
throw new TimeoutException();
return result;
}
public InputStream getInputStream()
@ -97,10 +158,49 @@ public class InputStreamResponseListener extends Response.Listener.Empty
private class Input extends InputStream
{
private byte[] bytes;
private int index;
@Override
public int read() throws IOException
{
return 0;
while (true)
{
if (bytes != null)
{
if (index < bytes.length)
return bytes[index++];
length.addAndGet(-index);
bytes = null;
index = 0;
}
bytes = take();
LOG.debug("Dequeued {}/{} bytes", bytes, bytes.length);
if (bytes == EOF)
return -1;
if (bytes == FAILURE)
{
if (failure instanceof IOException)
throw (IOException)failure;
else
throw new IOException(failure);
}
signal();
}
}
private byte[] take() throws IOException
{
try
{
return queue.take();
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
}
}
}

View File

@ -24,6 +24,7 @@ import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.servlet.ServletException;
@ -42,6 +43,7 @@ import org.junit.Assert;
import org.junit.Test;
import static java.nio.file.StandardOpenOption.CREATE;
import static org.junit.Assert.fail;
public class HttpClientStreamTest extends AbstractHttpClientServerTest
{
@ -94,6 +96,8 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
public void testDownload() throws Exception
{
final byte[] data = new byte[128 * 1024];
byte value = 1;
Arrays.fill(data, value);
start(new AbstractHandler()
{
@Override
@ -116,13 +120,73 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
Assert.assertNotNull(input);
int length = 0;
while (input.read() >= 0)
while (input.read() == value)
{
if (length % 100 == 0)
Thread.sleep(1);
++length;
}
Assert.assertEquals(data.length, length);
Result result = listener.await(5, TimeUnit.SECONDS);
Assert.assertNotNull(result);
Assert.assertFalse(result.isFailed());
Assert.assertSame(response, result.getResponse());
}
@Test
public void testDownloadWithFailure() throws Exception
{
final byte[] data = new byte[64 * 1024];
byte value = 1;
Arrays.fill(data, value);
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
// Say we want to send this much...
response.setContentLength(2 * data.length);
// ...but write only half...
response.getOutputStream().write(data);
// ...then shutdown output
baseRequest.getHttpChannel().getEndPoint().shutdownOutput();
}
});
InputStreamResponseListener listener = new InputStreamResponseListener();
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.status());
InputStream input = listener.getInputStream();
Assert.assertNotNull(input);
int length = 0;
try
{
length = 0;
while (input.read() == value)
{
if (length % 100 == 0)
Thread.sleep(1);
++length;
}
fail();
}
catch (IOException expected)
{
}
Assert.assertEquals(data.length, length);
Result result = listener.await(5, TimeUnit.SECONDS);
Assert.assertNotNull(result);
Assert.assertTrue(result.isFailed());
}
}