388103 - Add API for tracking down upload progress.

Introduced Request.ContentListener, invoked after the content has been sent.
This commit is contained in:
Simone Bordet 2013-02-20 17:56:21 +01:00
parent 29d779778e
commit 3ffbb586db
6 changed files with 192 additions and 42 deletions

View File

@ -292,6 +292,13 @@ public class HttpRequest implements Request
return this;
}
@Override
public Request onRequestContent(ContentListener listener)
{
this.requestListeners.add(listener);
return this;
}
@Override
public Request onRequestSuccess(SuccessListener listener)
{

View File

@ -162,13 +162,16 @@ public class HttpSender implements AsyncContentProvider.Listener
while (true)
{
HttpGenerator.Result result = generator.generateRequest(requestInfo, header, chunk, contentChunk.content, contentChunk.lastContent);
ByteBuffer content = contentChunk.content;
final ByteBuffer contentBuffer = content == null ? null : content.slice();
HttpGenerator.Result result = generator.generateRequest(requestInfo, header, chunk, content, contentChunk.lastContent);
switch (result)
{
case NEED_INFO:
{
ContentProvider content = request.getContent();
long contentLength = content == null ? -1 : content.getLength();
ContentProvider requestContent = request.getContent();
long contentLength = requestContent == null ? -1 : requestContent.getLength();
requestInfo = new HttpGenerator.RequestInfo(request.getVersion(), request.getHeaders(), contentLength, request.getMethod().asString(), request.getPath());
break;
}
@ -224,16 +227,9 @@ public class HttpSender implements AsyncContentProvider.Listener
{
LOG.debug("Write succeeded for {}", request);
if (!commit(request))
if (!processWrite(request, contentBuffer, expecting100ContinueResponse))
return;
if (expecting100ContinueResponse)
{
LOG.debug("Expecting 100 Continue for {}", request);
continueContentChunk.signal();
return;
}
send();
}
@ -250,7 +246,7 @@ public class HttpSender implements AsyncContentProvider.Listener
continueContentChunk = new ContinueContentChunk(contentChunk);
}
write(callback, header, chunk, expecting100ContinueResponse ? null : contentChunk.content);
write(callback, header, chunk, expecting100ContinueResponse ? null : content);
if (callback.process())
{
@ -260,16 +256,9 @@ public class HttpSender implements AsyncContentProvider.Listener
if (callback.isSucceeded())
{
if (!commit(request))
if (!processWrite(request, contentBuffer, expecting100ContinueResponse))
return;
if (expecting100ContinueResponse)
{
LOG.debug("Expecting 100 Continue for {}", request);
continueContentChunk.signal();
return;
}
// Send further content
contentChunk = new ContentChunk(contentIterator);
@ -363,6 +352,27 @@ public class HttpSender implements AsyncContentProvider.Listener
}
}
private boolean processWrite(Request request, ByteBuffer content, boolean expecting100ContinueResponse)
{
if (!commit(request))
return false;
if (content != null)
{
RequestNotifier notifier = connection.getDestination().getRequestNotifier();
notifier.notifyContent(request, content);
}
if (expecting100ContinueResponse)
{
LOG.debug("Expecting 100 Continue for {}", request);
continueContentChunk.signal();
return false;
}
return true;
}
public void proceed(boolean proceed)
{
ContinueContentChunk contentChunk = continueContentChunk;

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.client;
import java.nio.ByteBuffer;
import java.util.List;
import org.eclipse.jetty.client.api.Request;
@ -155,6 +156,36 @@ public class RequestNotifier
}
}
public void notifyContent(Request request, ByteBuffer content)
{
// Optimized to avoid allocations of iterator instances
List<Request.RequestListener> requestListeners = request.getRequestListeners(null);
for (int i = 0; i < requestListeners.size(); ++i)
{
Request.RequestListener listener = requestListeners.get(i);
if (listener instanceof Request.ContentListener)
notifyContent((Request.ContentListener)listener, request, content);
}
List<Request.Listener> listeners = client.getRequestListeners();
for (int i = 0; i < listeners.size(); ++i)
{
Request.Listener listener = listeners.get(i);
notifyContent(listener, request, content);
}
}
private void notifyContent(Request.ContentListener listener, Request request, ByteBuffer content)
{
try
{
listener.onContent(request, content);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
}
public void notifySuccess(Request request)
{
// Optimized to avoid allocations of iterator instances

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.client.api;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.EventListener;
import java.util.List;
@ -265,6 +266,12 @@ public interface Request
*/
Request onRequestCommit(CommitListener listener);
/**
* @param listener a listener for request content events
* @return this request object
*/
Request onRequestContent(ContentListener listener);
/**
* @param listener a listener for request success event
* @return this request object
@ -416,6 +423,19 @@ public interface Request
public void onCommit(Request request);
}
/**
* Listener for the request content event.
*/
public interface ContentListener extends RequestListener
{
/**
* Callback method invoked when a chunk of request content has been sent successfully.
* Changes to bytes in the given buffer have no effect, as the content has already been sent.
* @param request the request that has been committed
*/
public void onContent(Request request, ByteBuffer content);
}
/**
* Listener for the request succeeded event.
*/
@ -445,7 +465,7 @@ public interface Request
/**
* Listener for all request events.
*/
public interface Listener extends QueuedListener, BeginListener, HeadersListener, CommitListener, SuccessListener, FailureListener
public interface Listener extends QueuedListener, BeginListener, HeadersListener, CommitListener, ContentListener, SuccessListener, FailureListener
{
/**
* An empty implementation of {@link Listener}
@ -472,6 +492,11 @@ public interface Request
{
}
@Override
public void onContent(Request request, ByteBuffer content)
{
}
@Override
public void onSuccess(Request request)
{

View File

@ -28,6 +28,7 @@ import java.nio.channels.UnresolvedAddressException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
@ -35,6 +36,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.GZIPOutputStream;
import javax.servlet.ServletException;
@ -285,6 +287,59 @@ public class HttpClientTest extends AbstractHttpClientServerTest
Assert.assertArrayEquals(content, response.getContent());
}
@Test
public void test_POST_WithContent_NotifiesRequestContentListener() throws Exception
{
final byte[] content = {0, 1, 2, 3};
start(new EmptyServerHandler());
ContentResponse response = client.POST(scheme + "://localhost:" + connector.getLocalPort())
.onRequestContent(new Request.ContentListener()
{
@Override
public void onContent(Request request, ByteBuffer buffer)
{
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
if (!Arrays.equals(content, bytes))
request.abort(new Exception());
}
})
.content(new BytesContentProvider(content))
.timeout(5, TimeUnit.SECONDS)
.send();
Assert.assertNotNull(response);
Assert.assertEquals(200, response.getStatus());
}
@Test
public void test_POST_WithContent_TracksProgress() throws Exception
{
start(new EmptyServerHandler());
final AtomicInteger progress = new AtomicInteger();
ContentResponse response = client.POST(scheme + "://localhost:" + connector.getLocalPort())
.onRequestContent(new Request.ContentListener()
{
@Override
public void onContent(Request request, ByteBuffer buffer)
{
byte[] bytes = new byte[buffer.remaining()];
Assert.assertEquals(1, bytes.length);
buffer.get(bytes);
Assert.assertEquals(bytes[0], progress.getAndIncrement());
}
})
.content(new BytesContentProvider(new byte[]{0}, new byte[]{1}, new byte[]{2}, new byte[]{3}, new byte[]{4}))
.timeout(5, TimeUnit.SECONDS)
.send();
Assert.assertNotNull(response);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(5, progress.get());
}
@Test
public void test_QueuedRequest_IsSent_WhenPreviousRequestSucceeded() throws Exception
{

View File

@ -18,7 +18,6 @@
package org.eclipse.jetty.client;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
@ -26,7 +25,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -36,11 +34,9 @@ import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.ByteBufferContentProvider;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.StdErrLog;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.Assert;
import org.junit.Test;
@ -226,7 +222,6 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
}
});
StdErrLog.getLogger(HttpChannel.class).setHideStacks(true);
final Throwable cause = new Exception();
try
{
@ -254,24 +249,51 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
}
catch (ExecutionException x)
{
Throwable abort = x.getCause();
if (abort instanceof EOFException)
{
// Server closed abruptly
System.err.println("C");
}
else if (abort == cause)
{
// Expected
}
else
{
throw x;
}
Assert.assertSame(cause, x.getCause());
}
finally
}
@Test
public void testAbortOnContent() throws Exception
{
start(new EmptyServerHandler()
{
StdErrLog.getLogger(HttpChannel.class).setHideStacks(false);
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
super.handle(target, baseRequest, request, response);
IO.copy(request.getInputStream(), response.getOutputStream());
}
});
final Throwable cause = new Exception();
try
{
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.onRequestContent(new Request.ContentListener()
{
@Override
public void onContent(Request request, ByteBuffer content)
{
request.abort(cause);
}
})
.content(new ByteBufferContentProvider(ByteBuffer.wrap(new byte[]{0}), ByteBuffer.wrap(new byte[]{1}))
{
@Override
public long getLength()
{
return -1;
}
})
.timeout(5, TimeUnit.SECONDS)
.send();
Assert.fail();
}
catch (ExecutionException x)
{
Assert.assertSame(cause, x.getCause());
}
}