Intermediate commit.

This commit is contained in:
Simone Bordet 2012-09-17 15:11:31 +02:00
parent 707ff4aea8
commit 9355e06603
5 changed files with 180 additions and 54 deletions

View File

@ -23,7 +23,7 @@ import java.nio.file.Path;
import java.util.concurrent.Future;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.util.StreamingResponseListener;
import org.eclipse.jetty.client.util.InputStreamResponseListener;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion;
@ -213,7 +213,7 @@ public interface Request
* that the response content can be buffered without exceeding memory constraints.
* For example, this method is not appropriate to download big files from a server; consider using
* {@link #send(Response.Listener)} instead, passing your own {@link Response.Listener} or a utility
* listener such as {@link StreamingResponseListener}.
* listener such as {@link InputStreamResponseListener}.
* <p />
* The future will return when {@link Response.Listener#onComplete(Result)} is invoked.
*

View File

@ -0,0 +1,106 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client.util;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
public class InputStreamResponseListener extends Response.Listener.Empty
{
private final AtomicLong length = new AtomicLong();
private final CountDownLatch responseLatch = new CountDownLatch(1);
private final long capacity;
private Response response;
private volatile Throwable failure;
public InputStreamResponseListener()
{
this(1024 * 1024L);
}
public InputStreamResponseListener(long capacity)
{
this.capacity = capacity;
}
@Override
public void onHeaders(Response response)
{
this.response = response;
responseLatch.countDown();
}
@Override
public void onContent(Response response, ByteBuffer content)
{
int remaining = content.remaining();
byte[] bytes = new byte[remaining];
content.get(bytes);
long newLength = length.addAndGet(remaining);
if (newLength > capacity)
// wait
;
}
@Override
public void onFailure(Response response, Throwable failure)
{
this.failure = failure;
responseLatch.countDown();
}
public Response get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException
{
boolean expired = !responseLatch.await(timeout, unit);
if (expired)
throw new TimeoutException();
if (failure != null)
throw new ExecutionException(failure);
return response;
}
public Result await(long timeout, TimeUnit seconds)
{
return null;
}
public InputStream getInputStream()
{
return new Input();
}
private class Input extends InputStream
{
@Override
public int read() throws IOException
{
return 0;
}
}
}

View File

@ -1,42 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client.util;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.api.Response;
public class StreamingResponseListener extends Response.Listener.Empty
{
public Response get(long timeout, TimeUnit seconds)
{
return null;
}
public InputStream getInputStream()
{
return null;
}
public void writeTo(OutputStream outputStream)
{
}
}

View File

@ -18,15 +18,24 @@
package org.eclipse.jetty.client;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.InputStreamResponseListener;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.Assert;
@ -80,4 +89,40 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
// This is just to avoid exception traces in the test output
Thread.sleep(1000);
}
@Test
public void testDownload() throws Exception
{
final byte[] data = new byte[128 * 1024];
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.getOutputStream().write(data);
}
});
InputStreamResponseListener listener = new InputStreamResponseListener();
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.status());
InputStream input = listener.getInputStream();
Assert.assertNotNull(input);
int length = 0;
while (input.read() >= 0)
++length;
Assert.assertEquals(data.length, length);
Result result = listener.await(5, TimeUnit.SECONDS);
Assert.assertNotNull(result);
Assert.assertSame(response, result.getResponse());
}
}

View File

@ -18,7 +18,6 @@
package org.eclipse.jetty.client.api;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.nio.file.Paths;
import java.util.concurrent.CountDownLatch;
@ -28,8 +27,8 @@ import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.util.BlockingResponseListener;
import org.eclipse.jetty.client.util.InputStreamResponseListener;
import org.eclipse.jetty.client.util.PathContentProvider;
import org.eclipse.jetty.client.util.StreamingResponseListener;
import org.eclipse.jetty.http.HttpCookie;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion;
@ -161,10 +160,10 @@ public class Usage
}
@Test
public void testResponseStream() throws Exception
public void testResponseInputStream() throws Exception
{
HttpClient client = new HttpClient();
StreamingResponseListener listener = new StreamingResponseListener();
InputStreamResponseListener listener = new InputStreamResponseListener();
client.newRequest("localhost", 8080).send(listener);
// Call to get() blocks until the headers arrived
Response response = listener.get(5, TimeUnit.SECONDS);
@ -182,16 +181,34 @@ public class Usage
// No need for output stream; for example, parse bytes
}
}
// Solution 2: write to output stream
try (FileOutputStream output = new FileOutputStream(""))
{
listener.writeTo(output);
}
}
else
{
response.abort();
}
}
// @Test
// public void testResponseOutputStream() throws Exception
// {
// HttpClient client = new HttpClient();
//
// try (FileOutputStream output = new FileOutputStream(""))
// {
// OutputStreamResponseListener listener = new OutputStreamResponseListener(output);
// client.newRequest("localhost", 8080).send(listener);
// // Call to get() blocks until the headers arrived
// Response response = listener.get(5, TimeUnit.SECONDS);
// if (response.status() == 200)
// {
// // Solution 2: write to output stream
// {
// listener.writeTo(output);
// }
// }
// else
// {
// response.abort();
// }
// }
}