jetty-9 - HTTP client: implemented abort of request and responses.

This commit is contained in:
Simone Bordet 2012-09-13 14:22:43 +02:00
parent b5a329ecaf
commit 5315a6dcff
22 changed files with 672 additions and 158 deletions

View File

@ -30,6 +30,7 @@ import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -66,31 +67,18 @@ public class AuthenticationProtocolHandler implements ProtocolHandler
return new AuthenticationListener();
}
private class AuthenticationListener extends Response.Listener.Adapter
private class AuthenticationListener extends BufferingResponseListener
{
private byte[] buffer = new byte[0];
@Override
public void onContent(Response response, ByteBuffer content)
private AuthenticationListener()
{
if (buffer.length == maxContentLength)
return;
long newLength = buffer.length + content.remaining();
if (newLength > maxContentLength)
newLength = maxContentLength;
byte[] newBuffer = new byte[(int)newLength];
System.arraycopy(buffer, 0, newBuffer, 0, buffer.length);
content.get(newBuffer, buffer.length, content.remaining());
buffer = newBuffer;
super(maxContentLength);
}
@Override
public void onComplete(Result result)
{
Request request = result.getRequest();
ContentResponse response = new HttpContentResponse(result.getResponse(), buffer);
ContentResponse response = new HttpContentResponse(result.getResponse(), getContent());
if (result.isFailed())
{
Throwable failure = result.getFailure();
@ -136,7 +124,7 @@ public class AuthenticationProtocolHandler implements ProtocolHandler
}
authnResult.apply(request);
request.send(new Adapter()
request.send(new Response.Listener.Empty()
{
@Override
public void onSuccess(Response response)

View File

@ -250,6 +250,11 @@ public class HttpConnection extends AbstractConnection implements Connection
}
}
public void abort(HttpResponse response)
{
receiver.fail(new HttpResponseException("Response aborted", response));
}
@Override
public void close()
{

View File

@ -42,7 +42,7 @@ public class HttpExchange
this.connection = connection;
this.request = request;
this.listener = listener;
this.response = new HttpResponse(listener);
this.response = new HttpResponse(this, listener);
}
public HttpConversation conversation()
@ -117,6 +117,12 @@ public class HttpExchange
return false;
}
public void abort()
{
LOG.debug("Aborting {}", response);
connection.abort(response);
}
@Override
public String toString()
{

View File

@ -31,15 +31,13 @@ import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.client.util.BlockingResponseListener;
import org.eclipse.jetty.client.util.PathContentProvider;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.util.Fields;
import org.eclipse.jetty.util.FutureCallback;
public class HttpRequest implements Request
{
@ -59,6 +57,7 @@ public class HttpRequest implements Request
private Listener listener;
private ContentProvider content;
private boolean followRedirects = true;
private volatile boolean aborted;
public HttpRequest(HttpClient client, URI uri)
{
@ -296,22 +295,9 @@ public class HttpRequest implements Request
@Override
public Future<ContentResponse> send()
{
final FutureCallback<ContentResponse> callback = new FutureCallback<>();
BufferingResponseListener listener = new BufferingResponseListener()
{
@Override
public void onComplete(Result result)
{
super.onComplete(result);
HttpContentResponse contentResponse = new HttpContentResponse(result.getResponse(), content());
if (!result.isFailed())
callback.completed(contentResponse);
else
callback.failed(contentResponse, result.getFailure());
}
};
BlockingResponseListener listener = new BlockingResponseListener();
send(listener);
return callback;
return listener;
}
@Override
@ -320,6 +306,18 @@ public class HttpRequest implements Request
client.send(this, listener);
}
@Override
public void abort()
{
aborted = true;
}
@Override
public boolean aborted()
{
return aborted;
}
@Override
public String toString()
{

View File

@ -0,0 +1,37 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import org.eclipse.jetty.client.api.Request;
public class HttpRequestException extends Throwable
{
private final Request request;
public HttpRequestException(String message, Request request)
{
super(message);
this.request = request;
}
public Request getRequest()
{
return request;
}
}

View File

@ -25,13 +25,15 @@ import org.eclipse.jetty.http.HttpVersion;
public class HttpResponse implements Response
{
private final HttpFields headers = new HttpFields();
private final HttpExchange exchange;
private final Listener listener;
private HttpVersion version;
private int status;
private String reason;
public HttpResponse(Response.Listener listener)
public HttpResponse(HttpExchange exchange, Listener listener)
{
this.exchange = exchange;
this.listener = listener;
}
@ -84,7 +86,7 @@ public class HttpResponse implements Response
@Override
public void abort()
{
// request.abort();
exchange.abort();
}
@Override

View File

@ -60,13 +60,21 @@ public class HttpSender
public void send(HttpExchange exchange)
{
LOG.debug("Sending {}", exchange.request());
requestNotifier.notifyBegin(exchange.request());
ContentProvider content = exchange.request().content();
Request request = exchange.request();
if (request.aborted())
{
fail(new HttpRequestException("Request aborted", request));
}
else
{
LOG.debug("Sending {}", request);
requestNotifier.notifyBegin(request);
ContentProvider content = request.content();
this.contentLength = content == null ? -1 : content.length();
this.contentChunks = content == null ? Collections.<ByteBuffer>emptyIterator() : content.iterator();
send();
}
}
private void send()
{
@ -121,6 +129,12 @@ public class HttpSender
break;
}
case FLUSH:
{
if (request.aborted())
{
fail(new HttpRequestException("Request aborted", request));
}
else
{
StatefulExecutorCallback callback = new StatefulExecutorCallback(client.getExecutor())
{
@ -142,17 +156,12 @@ public class HttpSender
header = BufferUtil.EMPTY_BUFFER;
if (chunk == null)
chunk = BufferUtil.EMPTY_BUFFER;
LOG.debug("Writing {} {} {}", header, chunk, content);
endPoint.write(null, callback, header, chunk, content);
if (callback.pending())
{
LOG.debug("Write incomplete {} {} {}", header, chunk, content);
return;
}
if (callback.completed())
{
LOG.debug("Write complete {} {} {}", header, chunk, content);
if (!committed)
committed(request);
@ -160,6 +169,7 @@ public class HttpSender
content = contentChunks.hasNext() ? contentChunks.next() : BufferUtil.EMPTY_BUFFER;
lastContent = !contentChunks.hasNext();
}
}
break;
}
case SHUTDOWN_OUT:

View File

@ -24,7 +24,7 @@ import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;
public class RedirectProtocolHandler extends Response.Listener.Adapter implements ProtocolHandler
public class RedirectProtocolHandler extends Response.Listener.Empty implements ProtocolHandler
{
private static final String ATTRIBUTE = RedirectProtocolHandler.class.getName() + ".redirect";
@ -128,7 +128,7 @@ public class RedirectProtocolHandler extends Response.Listener.Adapter implement
// Copy content
redirect.content(request.content());
redirect.send(new Adapter());
redirect.send(new Response.Listener.Empty());
}
else
{

View File

@ -230,6 +230,18 @@ public interface Request
*/
void send(Response.Listener listener);
/**
* Attempts to abort the send of this request.
*
* @see #aborted()
*/
void abort();
/**
* @return whether {@link #abort()} was called
*/
boolean aborted();
/**
* Listener for request events
*/

View File

@ -51,7 +51,7 @@ public interface Response
public void onComplete(Result result);
public static class Adapter implements Listener
public static class Empty implements Listener
{
@Override
public void onBegin(Response response)

View File

@ -0,0 +1,117 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client.util;
import java.nio.ByteBuffer;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.client.HttpContentResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
public class BlockingResponseListener extends BufferingResponseListener implements Future<ContentResponse>
{
private final CountDownLatch latch = new CountDownLatch(1);
private ContentResponse response;
private Throwable failure;
private volatile boolean cancelled;
@Override
public void onBegin(Response response)
{
super.onBegin(response);
if (cancelled)
response.abort();
}
@Override
public void onHeaders(Response response)
{
super.onHeaders(response);
if (cancelled)
response.abort();
}
@Override
public void onContent(Response response, ByteBuffer content)
{
super.onContent(response, content);
if (cancelled)
response.abort();
}
@Override
public void onComplete(Result result)
{
super.onComplete(result);
response = new HttpContentResponse(result.getResponse(), getContent());
failure = result.getFailure();
latch.countDown();
}
@Override
public boolean cancel(boolean mayInterruptIfRunning)
{
cancelled = true;
return latch.getCount() == 0;
}
@Override
public boolean isCancelled()
{
return cancelled;
}
@Override
public boolean isDone()
{
return latch.getCount() == 0 || isCancelled();
}
@Override
public ContentResponse get() throws InterruptedException, ExecutionException
{
latch.await();
return result();
}
@Override
public ContentResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
{
boolean expired = !latch.await(timeout, unit);
if (expired)
throw new TimeoutException();
return result();
}
private ContentResponse result() throws ExecutionException
{
if (isCancelled())
throw (CancellationException)new CancellationException().initCause(failure);
if (failure != null)
throw new ExecutionException(failure);
return response;
}
}

View File

@ -18,38 +18,32 @@
package org.eclipse.jetty.client.util;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.nio.charset.UnsupportedCharsetException;
import org.eclipse.jetty.client.api.Response;
public class BufferingResponseListener extends Response.Listener.Adapter
public class BufferingResponseListener extends Response.Listener.Empty
{
private final CountDownLatch latch = new CountDownLatch(1);
private final int maxCapacity;
private Response response;
private Throwable failure;
private byte[] buffer = new byte[0];
private final int maxLength;
private volatile byte[] buffer = new byte[0];
public BufferingResponseListener()
{
this(16 * 1024 * 1024);
this(2 * 1024 * 1024);
}
public BufferingResponseListener(int maxCapacity)
public BufferingResponseListener(int maxLength)
{
this.maxCapacity = maxCapacity;
this.maxLength = maxLength;
}
@Override
public void onContent(Response response, ByteBuffer content)
{
long newLength = buffer.length + content.remaining();
if (newLength > maxCapacity)
if (newLength > maxLength)
throw new IllegalStateException("Buffering capacity exceeded");
byte[] newBuffer = new byte[(int)newLength];
@ -58,38 +52,20 @@ public class BufferingResponseListener extends Response.Listener.Adapter
buffer = newBuffer;
}
@Override
public void onSuccess(Response response)
{
this.response = response;
latch.countDown();
}
@Override
public void onFailure(Response response, Throwable failure)
{
this.response = response;
this.failure = failure;
latch.countDown();
}
public Response await(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
{
boolean expired = !latch.await(timeout, unit);
if (failure != null)
throw new ExecutionException(failure);
if (expired)
throw new TimeoutException();
return response;
}
public byte[] content()
public byte[] getContent()
{
return buffer;
}
public String contentAsString(String encoding)
public String getContent(String encoding)
{
return new String(content(), Charset.forName(encoding));
try
{
return new String(getContent(), encoding);
}
catch (UnsupportedEncodingException x)
{
throw new UnsupportedCharsetException(encoding);
}
}
}

View File

@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.api.Response;
public class StreamingResponseListener extends Response.Listener.Adapter
public class StreamingResponseListener extends Response.Listener.Empty
{
public Response get(long timeout, TimeUnit seconds)
{

View File

@ -113,7 +113,7 @@ public class HttpClientAuthenticationTest extends AbstractHttpClientServerTest
// Request without Authentication causes a 401
Request request = client.newRequest("localhost", connector.getLocalPort()).path("/test");
ContentResponse response = request.send().get(555, TimeUnit.SECONDS);
ContentResponse response = request.send().get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(401, response.status());
Assert.assertEquals(1, requests.get());
@ -133,7 +133,7 @@ public class HttpClientAuthenticationTest extends AbstractHttpClientServerTest
client.getRequestListeners().add(requestListener);
// Request with authentication causes a 401 (no previous successful authentication) + 200
response = request.send().get(555, TimeUnit.SECONDS);
response = request.send().get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.status());
Assert.assertEquals(2, requests.get());
@ -153,7 +153,7 @@ public class HttpClientAuthenticationTest extends AbstractHttpClientServerTest
// Further requests do not trigger 401 because there is a previous successful authentication
// Remove existing header to be sure it's added by the implementation
request.header(HttpHeader.AUTHORIZATION.asString(), null);
response = request.send().get(555, TimeUnit.SECONDS);
response = request.send().get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.status());
Assert.assertEquals(1, requests.get());

View File

@ -234,7 +234,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
}
}
})
.send(new Response.Listener.Adapter()
.send(new Response.Listener.Empty()
{
@Override
public void onSuccess(Response response)
@ -253,7 +253,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
latch.countDown();
}
})
.send(new Response.Listener.Adapter()
.send(new Response.Listener.Empty()
{
@Override
public void onSuccess(Response response)
@ -299,7 +299,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
latch.countDown();
}
})
.send(new Response.Listener.Adapter()
.send(new Response.Listener.Empty()
{
@Override
public void onFailure(Response response, Throwable failure)
@ -309,7 +309,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
});
client.newRequest("http://localhost:" + connector.getLocalPort())
.send(new Response.Listener.Adapter()
.send(new Response.Listener.Empty()
{
@Override
public void onSuccess(Response response)
@ -354,7 +354,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
latch.countDown();
}
})
.send(new Response.Listener.Adapter()
.send(new Response.Listener.Empty()
{
@Override
public void onSuccess(Response response)
@ -414,7 +414,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
return Arrays.asList(ByteBuffer.allocate(chunkSize), null).iterator();
}
})
.send(new Response.Listener.Adapter()
.send(new Response.Listener.Empty()
{
@Override
public void onComplete(Result result)
@ -444,7 +444,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
destination.getActiveConnections().peek().close();
}
})
.send(new Response.Listener.Adapter()
.send(new Response.Listener.Empty()
{
@Override
public void onComplete(Result result)

View File

@ -66,7 +66,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
successLatch.countDown();
}
})
.send(new Response.Listener.Adapter()
.send(new Response.Listener.Empty()
{
@Override
public void onHeaders(Response response)
@ -129,7 +129,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
{
failureLatch.countDown();
}
}).send(new Response.Listener.Adapter()
}).send(new Response.Listener.Empty()
{
@Override
public void onComplete(Result result)
@ -181,7 +181,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
successLatch.countDown();
}
})
.send(new Response.Listener.Adapter()
.send(new Response.Listener.Empty()
{
@Override
public void onSuccess(Response response)
@ -234,7 +234,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
failureLatch.countDown();
}
})
.send(new Response.Listener.Adapter()
.send(new Response.Listener.Empty()
{
@Override
public void onComplete(Result result)
@ -276,7 +276,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest(host, port)
.send(new Response.Listener.Adapter()
.send(new Response.Listener.Empty()
{
@Override
public void onComplete(Result result)
@ -321,7 +321,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest(host, port)
.content(new ByteBufferContentProvider(ByteBuffer.allocate(16 * 1024 * 1024)))
.send(new Response.Listener.Adapter()
.send(new Response.Listener.Empty()
{
@Override
public void onComplete(Result result)

View File

@ -26,7 +26,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.client.util.BlockingResponseListener;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpVersion;
@ -66,6 +66,7 @@ public class HttpReceiverTest
HttpExchange exchange = new HttpExchange(conversation, connection, null, listener);
conversation.exchanges().offer(exchange);
connection.setExchange(exchange);
exchange.requestComplete(true);
return exchange;
}
@ -78,7 +79,7 @@ public class HttpReceiverTest
"\r\n");
final AtomicReference<Response> responseRef = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
HttpExchange exchange = newExchange(new Response.Listener.Adapter()
HttpExchange exchange = newExchange(new Response.Listener.Empty()
{
@Override
public void onSuccess(Response response)
@ -110,11 +111,11 @@ public class HttpReceiverTest
"Content-length: " + content.length() + "\r\n" +
"\r\n" +
content);
BufferingResponseListener listener = new BufferingResponseListener();
BlockingResponseListener listener = new BlockingResponseListener();
HttpExchange exchange = newExchange(listener);
exchange.receive();
Response response = listener.await(5, TimeUnit.SECONDS);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.status());
Assert.assertEquals("OK", response.reason());
@ -123,7 +124,7 @@ public class HttpReceiverTest
Assert.assertNotNull(headers);
Assert.assertEquals(1, headers.size());
Assert.assertEquals(String.valueOf(content.length()), headers.get(HttpHeader.CONTENT_LENGTH));
String received = listener.contentAsString("UTF-8");
String received = listener.getContent("UTF-8");
Assert.assertEquals(content, received);
}
@ -137,7 +138,7 @@ public class HttpReceiverTest
"Content-length: " + (content1.length() + content2.length()) + "\r\n" +
"\r\n" +
content1);
BufferingResponseListener listener = new BufferingResponseListener();
BlockingResponseListener listener = new BlockingResponseListener();
HttpExchange exchange = newExchange(listener);
exchange.receive();
endPoint.setInputEOF();
@ -145,7 +146,7 @@ public class HttpReceiverTest
try
{
listener.await(5, TimeUnit.SECONDS);
listener.get(5, TimeUnit.SECONDS);
Assert.fail();
}
catch (ExecutionException e)
@ -161,7 +162,7 @@ public class HttpReceiverTest
"HTTP/1.1 200 OK\r\n" +
"Content-length: 1\r\n" +
"\r\n");
BufferingResponseListener listener = new BufferingResponseListener();
BlockingResponseListener listener = new BlockingResponseListener();
HttpExchange exchange = newExchange(listener);
exchange.receive();
// Simulate an idle timeout
@ -169,7 +170,7 @@ public class HttpReceiverTest
try
{
listener.await(5, TimeUnit.SECONDS);
listener.get(5, TimeUnit.SECONDS);
Assert.fail();
}
catch (ExecutionException e)
@ -185,13 +186,13 @@ public class HttpReceiverTest
"HTTP/1.1 200 OK\r\n" +
"Content-length: A\r\n" +
"\r\n");
BufferingResponseListener listener = new BufferingResponseListener();
BlockingResponseListener listener = new BlockingResponseListener();
HttpExchange exchange = newExchange(listener);
exchange.receive();
try
{
listener.await(5, TimeUnit.SECONDS);
listener.get(5, TimeUnit.SECONDS);
Assert.fail();
}
catch (ExecutionException e)

View File

@ -0,0 +1,188 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.util.ByteBufferContentProvider;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.IO;
import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class HttpRequestAbortTest extends AbstractHttpClientServerTest
{
@Test
public void testAbortOnQueued() throws Exception
{
start(new EmptyServerHandler());
final AtomicBoolean begin = new AtomicBoolean();
try
{
client.newRequest("localhost", connector.getLocalPort())
.listener(new Request.Listener.Empty()
{
@Override
public void onQueued(Request request)
{
request.abort();
}
@Override
public void onBegin(Request request)
{
begin.set(true);
}
})
.send().get(5, TimeUnit.SECONDS);
fail();
}
catch (ExecutionException x)
{
HttpRequestException xx = (HttpRequestException)x.getCause();
Request request = xx.getRequest();
Assert.assertNotNull(request);
Assert.assertFalse(begin.get());
}
}
@Test
public void testAbortOnBegin() throws Exception
{
start(new EmptyServerHandler());
final AtomicBoolean headers = new AtomicBoolean();
try
{
client.newRequest("localhost", connector.getLocalPort())
.listener(new Request.Listener.Empty()
{
@Override
public void onBegin(Request request)
{
request.abort();
}
@Override
public void onHeaders(Request request)
{
headers.set(true);
}
})
.send().get(5, TimeUnit.SECONDS);
fail();
}
catch (ExecutionException x)
{
HttpRequestException xx = (HttpRequestException)x.getCause();
Request request = xx.getRequest();
Assert.assertNotNull(request);
Assert.assertFalse(headers.get());
}
}
@Test
public void testAbortOnHeaders() throws Exception
{
start(new EmptyServerHandler());
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.listener(new Request.Listener.Empty()
{
@Override
public void onHeaders(Request request)
{
// Too late to abort
request.abort();
}
})
.send().get(5, TimeUnit.SECONDS);
assertEquals(200, response.status());
}
@Test
public void testAbortOnHeadersWithContent() throws Exception
{
final AtomicReference<IOException> failure = new AtomicReference<>();
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
try
{
baseRequest.setHandled(true);
IO.copy(request.getInputStream(), response.getOutputStream());
}
catch (IOException x)
{
failure.set(x);
throw x;
}
}
});
// Test can behave in 2 ways:
// A) if the request is failed before the request arrived, then we get an ExecutionException
// B) if the request is failed after the request arrived, then we get a 500
try
{
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.listener(new Request.Listener.Empty()
{
@Override
public void onHeaders(Request request)
{
request.abort();
}
})
.content(new ByteBufferContentProvider(ByteBuffer.wrap(new byte[]{0}), ByteBuffer.wrap(new byte[]{1}))
{
@Override
public long length()
{
return -1;
}
})
.send().get(5, TimeUnit.SECONDS);
Assert.assertNotNull(failure.get());
assertEquals(500, response.status());
}
catch (ExecutionException x)
{
HttpRequestException xx = (HttpRequestException)x.getCause();
Request request = xx.getRequest();
Assert.assertNotNull(request);
}
}
}

View File

@ -0,0 +1,169 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.Assert;
import org.junit.Test;
public class HttpResponseAbortTest extends AbstractHttpClientServerTest
{
@Test
public void testAbortOnBegin() throws Exception
{
start(new EmptyServerHandler());
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.send(new Response.Listener.Empty()
{
@Override
public void onBegin(Response response)
{
response.abort();
}
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isFailed());
latch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testAbortOnHeaders() throws Exception
{
start(new EmptyServerHandler());
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.send(new Response.Listener.Empty()
{
@Override
public void onHeaders(Response response)
{
response.abort();
}
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isFailed());
latch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testAbortOnContent() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
try
{
baseRequest.setHandled(true);
OutputStream output = response.getOutputStream();
output.write(1);
output.flush();
output.write(2);
output.flush();
}
catch (IOException ignored)
{
// The client may have already closed, and we'll get an exception here, but it's expected
}
}
});
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.send(new Response.Listener.Empty()
{
@Override
public void onContent(Response response, ByteBuffer content)
{
response.abort();
}
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isFailed());
latch.countDown();
}
});
Assert.assertTrue(latch.await(555, TimeUnit.SECONDS));
}
@Test(expected = CancellationException.class)
public void testCancelFuture() throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Future<ContentResponse>> ref = new AtomicReference<>();
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
try
{
latch.await(5, TimeUnit.SECONDS);
baseRequest.setHandled(true);
ref.get().cancel(true);
OutputStream output = response.getOutputStream();
output.write(new byte[]{0, 1, 2, 3, 4, 5, 6, 7});
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
}
});
Future<ContentResponse> future = client.newRequest("localhost", connector.getLocalPort()).send();
ref.set(future);
latch.countDown();
future.get(5, TimeUnit.SECONDS);
}
}

View File

@ -129,7 +129,7 @@ public class HttpSenderTest
failureLatch.countDown();
}
});
connection.send(request, new Response.Listener.Adapter()
connection.send(request, new Response.Listener.Empty()
{
@Override
public void onComplete(Result result)
@ -158,7 +158,7 @@ public class HttpSenderTest
failureLatch.countDown();
}
});
connection.send(request, new Response.Listener.Adapter()
connection.send(request, new Response.Listener.Empty()
{
@Override
public void onComplete(Result result)

View File

@ -27,7 +27,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.client.util.BlockingResponseListener;
import org.eclipse.jetty.client.util.PathContentProvider;
import org.eclipse.jetty.client.util.StreamingResponseListener;
import org.eclipse.jetty.http.HttpCookie;
@ -82,7 +82,7 @@ public class Usage
HttpClient client = new HttpClient();
final AtomicReference<Response> responseRef = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", 8080).send(new Response.Listener.Adapter()
client.newRequest("localhost", 8080).send(new Response.Listener.Empty()
{
@Override
public void onSuccess(Response response)
@ -119,9 +119,9 @@ public class Usage
try (Connection connection = client.getDestination("http", "localhost", 8080).newConnection().get(5, TimeUnit.SECONDS))
{
Request request = client.newRequest("localhost", 8080);
BufferingResponseListener listener = new BufferingResponseListener();
BlockingResponseListener listener = new BlockingResponseListener();
connection.send(request, listener);
Response response = listener.await(5, TimeUnit.SECONDS);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.status());
}

View File

@ -1111,6 +1111,11 @@ public class HttpParser
}
break;
}
case CLOSED:
{
BufferUtil.clear(buffer);
return false;
}
}
}