421198 - onComplete never call onComplete in

BufferingResponseListener in 9.1.

Introduced new method Request.onComplete(CompleteListener) to be able
to specify a CompleteListener even when using the blocking Request
.send() method.
This commit is contained in:
Simone Bordet 2013-11-07 13:33:29 +01:00
parent 660cf58a51
commit 293efe9798
5 changed files with 97 additions and 10 deletions

View File

@ -43,6 +43,7 @@ import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.FutureResponseListener;
import org.eclipse.jetty.client.util.PathContentProvider;
import org.eclipse.jetty.http.HttpField;
@ -478,6 +479,20 @@ public class HttpRequest implements Request
return this;
}
@Override
public Request onComplete(final Response.CompleteListener listener)
{
this.responseListeners.add(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
listener.onComplete(result);
}
});
return this;
}
@Override
public ContentProvider getContent()
{

View File

@ -36,6 +36,7 @@ public class RequestNotifier
this.client = client;
}
@SuppressWarnings("ForLoopReplaceableByForEach")
public void notifyQueued(Request request)
{
// Optimized to avoid allocations of iterator instances
@ -66,6 +67,7 @@ public class RequestNotifier
}
}
@SuppressWarnings("ForLoopReplaceableByForEach")
public void notifyBegin(Request request)
{
// Optimized to avoid allocations of iterator instances
@ -96,6 +98,7 @@ public class RequestNotifier
}
}
@SuppressWarnings("ForLoopReplaceableByForEach")
public void notifyHeaders(Request request)
{
// Optimized to avoid allocations of iterator instances
@ -126,6 +129,7 @@ public class RequestNotifier
}
}
@SuppressWarnings("ForLoopReplaceableByForEach")
public void notifyCommit(Request request)
{
// Optimized to avoid allocations of iterator instances
@ -156,20 +160,31 @@ public class RequestNotifier
}
}
@SuppressWarnings("ForLoopReplaceableByForEach")
public void notifyContent(Request request, ByteBuffer content)
{
// Optimized to avoid allocations of iterator instances
// Slice the buffer to avoid that listeners peek into data they should not look at.
content = content.slice();
// Optimized to avoid allocations of iterator instances.
List<Request.RequestListener> requestListeners = request.getRequestListeners(null);
for (int i = 0; i < requestListeners.size(); ++i)
{
Request.RequestListener listener = requestListeners.get(i);
if (listener instanceof Request.ContentListener)
{
// The buffer was sliced, so we always clear it (position=0, limit=capacity)
// before passing it to the listener that may consume it.
content.clear();
notifyContent((Request.ContentListener)listener, request, content);
}
}
List<Request.Listener> listeners = client.getRequestListeners();
for (int i = 0; i < listeners.size(); ++i)
{
Request.Listener listener = listeners.get(i);
// The buffer was sliced, so we always clear it (position=0, limit=capacity)
// before passing it to the listener that may consume it.
content.clear();
notifyContent(listener, request, content);
}
}
@ -186,6 +201,7 @@ public class RequestNotifier
}
}
@SuppressWarnings("ForLoopReplaceableByForEach")
public void notifySuccess(Request request)
{
// Optimized to avoid allocations of iterator instances
@ -216,6 +232,7 @@ public class RequestNotifier
}
}
@SuppressWarnings("ForLoopReplaceableByForEach")
public void notifyFailure(Request request, Throwable failure)
{
// Optimized to avoid allocations of iterator instances

View File

@ -40,6 +40,7 @@ public class ResponseNotifier
this.client = client;
}
@SuppressWarnings("ForLoopReplaceableByForEach")
public void notifyBegin(List<Response.ResponseListener> listeners, Response response)
{
// Optimized to avoid allocations of iterator instances
@ -63,6 +64,7 @@ public class ResponseNotifier
}
}
@SuppressWarnings("ForLoopReplaceableByForEach")
public boolean notifyHeader(List<Response.ResponseListener> listeners, Response response, HttpField field)
{
boolean result = true;
@ -89,6 +91,7 @@ public class ResponseNotifier
}
}
@SuppressWarnings("ForLoopReplaceableByForEach")
public void notifyHeaders(List<Response.ResponseListener> listeners, Response response)
{
// Optimized to avoid allocations of iterator instances
@ -112,14 +115,22 @@ public class ResponseNotifier
}
}
@SuppressWarnings("ForLoopReplaceableByForEach")
public void notifyContent(List<Response.ResponseListener> listeners, Response response, ByteBuffer buffer)
{
// Slice the buffer to avoid that listeners peek into data they should not look at.
buffer = buffer.slice();
// Optimized to avoid allocations of iterator instances
for (int i = 0; i < listeners.size(); ++i)
{
Response.ResponseListener listener = listeners.get(i);
if (listener instanceof Response.ContentListener)
{
// The buffer was sliced, so we always clear it (position=0, limit=capacity)
// before passing it to the listener that may consume it.
buffer.clear();
notifyContent((Response.ContentListener)listener, response, buffer);
}
}
}
@ -135,6 +146,7 @@ public class ResponseNotifier
}
}
@SuppressWarnings("ForLoopReplaceableByForEach")
public void notifySuccess(List<Response.ResponseListener> listeners, Response response)
{
// Optimized to avoid allocations of iterator instances
@ -158,6 +170,7 @@ public class ResponseNotifier
}
}
@SuppressWarnings("ForLoopReplaceableByForEach")
public void notifyFailure(List<Response.ResponseListener> listeners, Response response, Throwable failure)
{
// Optimized to avoid allocations of iterator instances
@ -181,6 +194,7 @@ public class ResponseNotifier
}
}
@SuppressWarnings("ForLoopReplaceableByForEach")
public void notifyComplete(List<Response.ResponseListener> listeners, Result result)
{
// Optimized to avoid allocations of iterator instances

View File

@ -353,6 +353,12 @@ public interface Request
*/
Request onResponseFailure(Response.FailureListener listener);
/**
* @param listener a listener for complete event
* @return this request object
*/
Request onComplete(Response.CompleteListener listener);
/**
* Sends this request and returns the response.
* <p />

View File

@ -18,9 +18,6 @@
package org.eclipse.jetty.client;
import static java.nio.file.StandardOpenOption.CREATE;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.HttpCookie;
@ -38,6 +35,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@ -45,7 +43,6 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.GZIPOutputStream;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
@ -59,13 +56,12 @@ import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.client.http.HttpDestinationOverHTTP;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.FS;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.toolchain.test.TestingDir;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.IO;
@ -74,11 +70,13 @@ import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import static java.nio.file.StandardOpenOption.CREATE;
public class HttpClientTest extends AbstractHttpClientServerTest
{
@Rule
public TestingDir testdir = new TestingDir();
@Rule
public TestingDir testdir = new TestingDir();
public HttpClientTest(SslContextFactory sslContextFactory)
{
super(sslContextFactory);
@ -1016,4 +1014,41 @@ public class HttpClientTest extends AbstractHttpClientServerTest
int expected = expectedEventsTriggeredByOnResponseXXXListeners + expectedEventsTriggeredByCompletionListener;
Assert.assertEquals(expected, counter.get());
}
@Test
public void setOnCompleteCallbackWithBlockingSend() throws Exception
{
final byte[] content = new byte[512];
new Random().nextBytes(content);
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.getOutputStream().write(content);
}
});
final AtomicInteger complete = new AtomicInteger();
BufferingResponseListener listener = new BufferingResponseListener()
{
@Override
public void onComplete(Result result)
{
complete.incrementAndGet();
}
};
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.onResponseContent(listener)
.onComplete(listener)
.send();
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(1, complete.get());
Assert.assertArrayEquals(content, listener.getContent());
Assert.assertArrayEquals(content, response.getContent());
}
}