#392733 - Implement a total timeout for asynchronous sends.

This commit is contained in:
Simone Bordet 2012-10-24 21:36:11 +02:00
parent 1b6d919d8c
commit 975a20271f
23 changed files with 344 additions and 128 deletions

View File

@ -36,6 +36,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLEngine;
import org.eclipse.jetty.client.api.AuthenticationStore;
@ -120,6 +121,7 @@ public class HttpClient extends ContainerLifeCycle
private volatile int responseBufferSize = 4096;
private volatile int maxRedirects = 8;
private volatile SocketAddress bindAddress;
private volatile long connectTimeout = 15000;
private volatile long idleTimeout;
private volatile boolean tcpNoDelay = true;
private volatile boolean dispatchIO = true;
@ -168,6 +170,7 @@ public class HttpClient extends ContainerLifeCycle
addBean(scheduler);
selectorManager = newSelectorManager();
selectorManager.setConnectTimeout(getConnectTimeout());
addBean(selectorManager);
handlers.add(new ContinueProtocolHandler(this));
@ -278,7 +281,7 @@ public class HttpClient extends ContainerLifeCycle
return provideDestination(scheme, host, port);
}
private HttpDestination provideDestination(String scheme, String host, int port)
protected HttpDestination provideDestination(String scheme, String host, int port)
{
String address = address(scheme, host, port);
HttpDestination destination = destinations.get(address);
@ -305,7 +308,7 @@ public class HttpClient extends ContainerLifeCycle
return new ArrayList<Destination>(destinations.values());
}
protected void send(Request request, Response.Listener listener)
protected void send(final Request request, long timeout, TimeUnit unit, Response.Listener listener)
{
String scheme = request.scheme().toLowerCase();
if (!Arrays.asList("http", "https").contains(scheme))
@ -315,7 +318,20 @@ public class HttpClient extends ContainerLifeCycle
if (port < 0)
port = "https".equals(scheme) ? 443 : 80;
provideDestination(scheme, request.host(), port).send(request, listener);
if (timeout > 0)
{
scheduler.schedule(new Runnable()
{
@Override
public void run()
{
request.abort("Total timeout elapsed");
}
}, timeout, unit);
}
HttpDestination destination = provideDestination(scheme, request.host(), port);
destination.send(request, listener);
}
protected void newConnection(HttpDestination destination, Callback<Connection> callback)
@ -405,6 +421,16 @@ public class HttpClient extends ContainerLifeCycle
this.byteBufferPool = byteBufferPool;
}
public long getConnectTimeout()
{
return connectTimeout;
}
public void setConnectTimeout(long connectTimeout)
{
this.connectTimeout = connectTimeout;
}
public long getIdleTimeout()
{
return idleTimeout;

View File

@ -349,13 +349,13 @@ public class HttpConnection extends AbstractConnection implements Connection
}
}
public boolean abort(HttpExchange exchange)
public boolean abort(HttpExchange exchange, String reason)
{
// We want the return value to be that of the response
// because if the response has already successfully
// arrived then we failed to abort the exchange
sender.abort(exchange);
return receiver.abort(exchange);
sender.abort(exchange, reason);
return receiver.abort(exchange, reason);
}
public void proceed(boolean proceed)

View File

@ -76,9 +76,9 @@ public class HttpContentResponse implements ContentResponse
}
@Override
public void abort()
public boolean abort(String reason)
{
response.abort();
return response.abort(reason);
}
@Override

View File

@ -119,10 +119,10 @@ public class HttpConversation implements Attributes
attributes.clear();
}
public boolean abort()
public boolean abort(String reason)
{
HttpExchange exchange = exchanges.peekLast();
return exchange != null && exchange.abort();
return exchange != null && exchange.abort(reason);
}
@Override

View File

@ -216,7 +216,7 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
*/
protected void process(final Connection connection, boolean dispatch)
{
final RequestPair requestPair = requests.poll();
RequestPair requestPair = requests.poll();
if (requestPair == null)
{
LOG.debug("{} idle", connection);
@ -234,25 +234,35 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
}
else
{
LOG.debug("{} active", connection);
if (!activeConnections.offer(connection))
final Request request = requestPair.request;
final Response.Listener listener = requestPair.listener;
if (request.aborted())
{
LOG.warn("{} active overflow");
}
if (dispatch)
{
client.getExecutor().execute(new Runnable()
{
@Override
public void run()
{
connection.send(requestPair.request, requestPair.listener);
}
});
abort(request, listener, "Aborted");
LOG.debug("Aborted {} before processing", request);
}
else
{
connection.send(requestPair.request, requestPair.listener);
LOG.debug("{} active", connection);
if (!activeConnections.offer(connection))
{
LOG.warn("{} active overflow");
}
if (dispatch)
{
client.getExecutor().execute(new Runnable()
{
@Override
public void run()
{
connection.send(request, listener);
}
});
}
else
{
connection.send(request, listener);
}
}
}
}
@ -283,7 +293,7 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
activeConnections.remove(connection);
idleConnections.remove(connection);
// We need to executed queued requests even if this connection failed.
// We need to execute queued requests even if this connection failed.
// We may create a connection that is not needed, but it will eventually
// idle timeout, so no worries
if (!requests.isEmpty())
@ -318,6 +328,33 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
LOG.debug("Closed {}", this);
}
public boolean abort(Request request, String reason)
{
for (RequestPair pair : requests)
{
if (pair.request == request)
{
if (requests.remove(pair))
{
// We were able to remove the pair, so it won't be processed
abort(request, pair.listener, reason);
LOG.debug("Aborted {} while queued", request);
return true;
}
}
}
return false;
}
private void abort(Request request, Response.Listener listener, String reason)
{
HttpResponse response = new HttpResponse(request, listener);
HttpResponseException responseFailure = new HttpResponseException(reason, response);
responseNotifier.notifyFailure(listener, response, responseFailure);
HttpRequestException requestFailure = new HttpRequestException(reason, request);
responseNotifier.notifyComplete(listener, new Result(request, requestFailure, response, responseFailure));
}
@Override
public String dump()
{

View File

@ -48,7 +48,7 @@ public class HttpExchange
this.connection = connection;
this.request = request;
this.listener = listener;
this.response = new HttpResponse(this, listener);
this.response = new HttpResponse(request, listener);
}
public HttpConversation conversation()
@ -168,10 +168,10 @@ public class HttpExchange
return new AtomicMarkableReference<>(result, modified);
}
public boolean abort()
public boolean abort(String reason)
{
LOG.debug("Aborting {}", this);
boolean aborted = connection.abort(this);
LOG.debug("Aborting {} reason {}", this, reason);
boolean aborted = connection.abort(this, reason);
LOG.debug("Aborted {}: {}", this, aborted);
return aborted;
}
@ -189,7 +189,12 @@ public class HttpExchange
connection.proceed(proceed);
}
public void terminate()
public void terminateRequest()
{
terminate.countDown();
}
public void terminateResponse()
{
terminate.countDown();
}

View File

@ -284,7 +284,7 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
if (!updateState(State.RECEIVE, State.IDLE))
throw new IllegalStateException();
exchange.terminate();
exchange.terminateResponse();
HttpResponse response = exchange.response();
Response.Listener listener = exchange.conversation().listener();
@ -326,7 +326,7 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
break;
}
exchange.terminate();
exchange.terminateResponse();
HttpResponse response = exchange.response();
HttpConversation conversation = exchange.conversation();
@ -373,9 +373,9 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
fail(new TimeoutException());
}
public boolean abort(HttpExchange exchange)
public boolean abort(HttpExchange exchange, String reason)
{
return fail(new HttpResponseException("Response aborted", exchange.response()));
return fail(new HttpResponseException(reason == null ? "Response aborted" : reason, exchange.response()));
}
private boolean updateState(State from, State to)

View File

@ -29,6 +29,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.client.api.ContentProvider;
@ -314,7 +315,7 @@ public class HttpRequest implements Request
@Override
public Future<ContentResponse> send()
{
BlockingResponseListener listener = new BlockingResponseListener();
BlockingResponseListener listener = new BlockingResponseListener(this);
send(listener);
return listener;
}
@ -322,15 +323,23 @@ public class HttpRequest implements Request
@Override
public void send(final Response.Listener listener)
{
client.send(this, listener);
send(0, TimeUnit.SECONDS, listener);
}
@Override
public boolean abort()
public void send(long timeout, TimeUnit unit, Response.Listener listener)
{
client.send(this, timeout, unit, listener);
}
@Override
public boolean abort(String reason)
{
aborted = true;
if (client.provideDestination(scheme(), host(), port()).abort(this, reason))
return true;
HttpConversation conversation = client.getConversation(conversation());
return conversation != null && conversation.abort();
return conversation.abort(reason);
}
@Override

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.client;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion;
@ -25,15 +26,15 @@ import org.eclipse.jetty.http.HttpVersion;
public class HttpResponse implements Response
{
private final HttpFields headers = new HttpFields();
private final HttpExchange exchange;
private final Request request;
private final Listener listener;
private HttpVersion version;
private int status;
private String reason;
public HttpResponse(HttpExchange exchange, Listener listener)
public HttpResponse(Request request, Listener listener)
{
this.exchange = exchange;
this.request = request;
this.listener = listener;
}
@ -80,7 +81,7 @@ public class HttpResponse implements Response
@Override
public long conversation()
{
return exchange.request().conversation();
return request.conversation();
}
@Override
@ -90,9 +91,9 @@ public class HttpResponse implements Response
}
@Override
public void abort()
public boolean abort(String reason)
{
exchange.request().abort();
return request.abort(reason);
}
@Override

View File

@ -77,7 +77,7 @@ public class HttpSender
Request request = exchange.request();
if (request.aborted())
{
exchange.abort();
exchange.abort(null);
}
else
{
@ -344,7 +344,7 @@ public class HttpSender
if (!updateState(State.COMMIT, State.IDLE))
throw new IllegalStateException();
exchange.terminate();
exchange.terminateRequest();
// It is important to notify completion *after* we reset because
// the notification may trigger another request/response
@ -385,7 +385,7 @@ public class HttpSender
break;
}
exchange.terminate();
exchange.terminateRequest();
Request request = exchange.request();
requestNotifier.notifyFailure(request, failure);
@ -396,7 +396,7 @@ public class HttpSender
if (result == null && notCommitted && !request.aborted())
{
result = exchange.responseComplete(failure).getReference();
exchange.terminate();
exchange.terminateResponse();
LOG.debug("Failed on behalf {}", exchange);
}
@ -411,12 +411,12 @@ public class HttpSender
return true;
}
public boolean abort(HttpExchange exchange)
public boolean abort(HttpExchange exchange, String reason)
{
State current = state.get();
boolean abortable = current == State.IDLE || current == State.SEND ||
current == State.COMMIT && contentIterator.hasNext();
return abortable && fail(new HttpRequestException("Request aborted", exchange.request()));
return abortable && fail(new HttpRequestException(reason == null ? "Request aborted" : reason, exchange.request()));
}
private void releaseBuffers(ByteBufferPool bufferPool, ByteBuffer header, ByteBuffer chunk)

View File

@ -135,7 +135,7 @@ public class RedirectProtocolHandler extends Response.Listener.Empty implements
public void onBegin(Request redirect)
{
if (request.aborted())
redirect.abort();
redirect.abort(null);
}
});

View File

@ -23,6 +23,7 @@ import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.util.InputStreamResponseListener;
@ -241,18 +242,35 @@ public interface Request
* as they happen, or when the application needs to efficiently manage the response content.
*
* @param listener the listener that receives response events
* @see #send(long, TimeUnit, Response.Listener)
*/
void send(Response.Listener listener);
/**
* Attempts to abort the send of this request.
* Sends this request and asynchronously notifies the given listener for response events.
* <p />
* This method should be used when the application needs to be notified of the various response events
* as they happen, or when the application needs to efficiently manage the response content.
* <p />
* This method waits for the given timeout before aborting the HTTP conversation. A {@code timeout}
* value of zero means to wait indefinitely to the conversation to complete.
*
* @return whether the abort succeeded
* @param timeout the total timeout in the given {@code unit}
* @param unit the timeout unit
* @param listener the listener that receives response events
*/
boolean abort();
void send(long timeout, TimeUnit unit, Response.Listener listener);
/**
* @return whether {@link #abort(boolean)} was called
* Attempts to abort the send of this request.
*
* @param reason the abort reason
* @return whether the abort succeeded
*/
boolean abort(String reason);
/**
* @return whether {@link #abort(String)} was called
*/
boolean aborted();

View File

@ -67,9 +67,12 @@ public interface Response
HttpFields headers();
/**
* Attempts to abort the send of this request.
* Attempts to abort the receive of this response.
*
* @param reason the abort reason
* @return whether the abort succeeded
*/
void abort();
boolean abort(String reason);
/**
* Listener for response events

View File

@ -18,7 +18,6 @@
package org.eclipse.jetty.client.util;
import java.nio.ByteBuffer;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@ -28,38 +27,20 @@ import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.client.HttpContentResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Result;
public class BlockingResponseListener extends BufferingResponseListener implements Future<ContentResponse>
{
private final CountDownLatch latch = new CountDownLatch(1);
private final Request request;
private ContentResponse response;
private Throwable failure;
private volatile boolean cancelled;
@Override
public void onBegin(Response response)
public BlockingResponseListener(Request request)
{
super.onBegin(response);
if (cancelled)
response.abort();
}
@Override
public void onHeaders(Response response)
{
super.onHeaders(response);
if (cancelled)
response.abort();
}
@Override
public void onContent(Response response, ByteBuffer content)
{
super.onContent(response, content);
if (cancelled)
response.abort();
this.request = request;
}
@Override
@ -75,7 +56,7 @@ public class BlockingResponseListener extends BufferingResponseListener implemen
public boolean cancel(boolean mayInterruptIfRunning)
{
cancelled = true;
return latch.getCount() == 0;
return request.abort("Cancelled");
}
@Override
@ -102,7 +83,10 @@ public class BlockingResponseListener extends BufferingResponseListener implemen
{
boolean expired = !latch.await(timeout, unit);
if (expired)
{
request.abort("Total timeout elapsed");
throw new TimeoutException();
}
return result();
}

View File

@ -63,7 +63,7 @@ public class BufferingResponseListener extends Response.Listener.Empty
HttpFields headers = response.headers();
long length = headers.getLongField(HttpHeader.CONTENT_LENGTH.asString());
if (length > maxLength)
response.abort();
response.abort("Buffering capacity exceeded");
String contentType = headers.get(HttpHeader.CONTENT_TYPE);
if (contentType != null)

View File

@ -408,7 +408,7 @@ public class HttpClientContinueTest extends AbstractHttpClientServerTest
@Override
public void onBegin(Response response)
{
response.abort();
response.abort(null);
}
@Override

View File

@ -46,7 +46,7 @@ public class HttpClientExplicitConnectionTest extends AbstractHttpClientServerTe
try (Connection connection = destination.newConnection().get(5, TimeUnit.SECONDS))
{
Request request = client.newRequest(destination.host(), destination.port()).scheme(scheme);
BlockingResponseListener listener = new BlockingResponseListener();
BlockingResponseListener listener = new BlockingResponseListener(request);
connection.send(request, listener);
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
@ -68,7 +68,7 @@ public class HttpClientExplicitConnectionTest extends AbstractHttpClientServerTe
Destination destination = client.getDestination(scheme, "localhost", connector.getLocalPort());
Connection connection = destination.newConnection().get(5, TimeUnit.SECONDS);
Request request = client.newRequest(destination.host(), destination.port()).scheme(scheme);
BlockingResponseListener listener = new BlockingResponseListener();
BlockingResponseListener listener = new BlockingResponseListener(request);
connection.send(request, listener);
ContentResponse response = listener.get(5, TimeUnit.SECONDS);

View File

@ -22,7 +22,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
@ -45,7 +44,6 @@ import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

View File

@ -0,0 +1,146 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.Assert;
import org.junit.Test;
public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
{
public HttpClientTimeoutTest(SslContextFactory sslContextFactory)
{
super(sslContextFactory);
}
@Slow
@Test(expected = TimeoutException.class)
public void testTimeoutOnFuture() throws Exception
{
long timeout = 1000;
start(new TimeoutHandler(2 * timeout));
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send().get(timeout, TimeUnit.MILLISECONDS);
}
@Slow
@Test
public void testTimeoutOnListener() throws Exception
{
long timeout = 1000;
start(new TimeoutHandler(2 * timeout));
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(timeout, TimeUnit.MILLISECONDS, new Response.Listener.Empty()
{
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isFailed());
latch.countDown();
}
});
Assert.assertTrue(latch.await(3 * timeout, TimeUnit.MILLISECONDS));
}
@Slow
@Test
public void testTimeoutOnQueuedRequest() throws Exception
{
long timeout = 1000;
start(new TimeoutHandler(3 * timeout));
// Only one connection so requests get queued
client.setMaxConnectionsPerAddress(1);
// The first request has a long timeout
final CountDownLatch firstLatch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(4 * timeout, TimeUnit.MILLISECONDS, new Response.Listener.Empty()
{
@Override
public void onComplete(Result result)
{
Assert.assertFalse(result.isFailed());
firstLatch.countDown();
}
});
// Second request has a short timeout and should fail in the queue
final CountDownLatch secondLatch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(timeout, TimeUnit.MILLISECONDS, new Response.Listener.Empty()
{
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isFailed());
secondLatch.countDown();
}
});
Assert.assertTrue(secondLatch.await(2 * timeout, TimeUnit.MILLISECONDS));
// The second request must fail before the first request has completed
Assert.assertTrue(firstLatch.getCount() > 0);
Assert.assertTrue(firstLatch.await(5 * timeout, TimeUnit.MILLISECONDS));
}
private class TimeoutHandler extends AbstractHandler
{
private final long timeout;
public TimeoutHandler(long timeout)
{
this.timeout = timeout;
}
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
try
{
TimeUnit.MILLISECONDS.sleep(timeout);
}
catch (InterruptedException x)
{
throw new ServletException(x);
}
}
}
}

View File

@ -22,11 +22,9 @@ import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.GZIPOutputStream;
import org.eclipse.jetty.client.api.ContentResponse;
@ -66,14 +64,15 @@ public class HttpReceiverTest
client.stop();
}
protected HttpExchange newExchange(Response.Listener listener)
protected HttpExchange newExchange()
{
HttpRequest request = new HttpRequest(client, URI.create("http://localhost"));
BlockingResponseListener listener = new BlockingResponseListener(request);
HttpExchange exchange = new HttpExchange(conversation, connection, request, listener);
conversation.exchanges().offer(exchange);
connection.setExchange(exchange);
exchange.requestComplete(null);
exchange.terminate();
exchange.terminateRequest();
return exchange;
}
@ -84,21 +83,11 @@ public class HttpReceiverTest
"HTTP/1.1 200 OK\r\n" +
"Content-length: 0\r\n" +
"\r\n");
final AtomicReference<Response> responseRef = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
HttpExchange exchange = newExchange(new Response.Listener.Empty()
{
@Override
public void onSuccess(Response response)
{
responseRef.set(response);
latch.countDown();
}
});
HttpExchange exchange = newExchange();
BlockingResponseListener listener = (BlockingResponseListener)exchange.listener();
exchange.receive();
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Response response = responseRef.get();
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.status());
Assert.assertEquals("OK", response.reason());
@ -118,8 +107,8 @@ public class HttpReceiverTest
"Content-length: " + content.length() + "\r\n" +
"\r\n" +
content);
BlockingResponseListener listener = new BlockingResponseListener();
HttpExchange exchange = newExchange(listener);
HttpExchange exchange = newExchange();
BlockingResponseListener listener = (BlockingResponseListener)exchange.listener();
exchange.receive();
Response response = listener.get(5, TimeUnit.SECONDS);
@ -145,8 +134,8 @@ public class HttpReceiverTest
"Content-length: " + (content1.length() + content2.length()) + "\r\n" +
"\r\n" +
content1);
BlockingResponseListener listener = new BlockingResponseListener();
HttpExchange exchange = newExchange(listener);
HttpExchange exchange = newExchange();
BlockingResponseListener listener = (BlockingResponseListener)exchange.listener();
exchange.receive();
endPoint.setInputEOF();
exchange.receive();
@ -169,8 +158,8 @@ public class HttpReceiverTest
"HTTP/1.1 200 OK\r\n" +
"Content-length: 1\r\n" +
"\r\n");
BlockingResponseListener listener = new BlockingResponseListener();
HttpExchange exchange = newExchange(listener);
HttpExchange exchange = newExchange();
BlockingResponseListener listener = (BlockingResponseListener)exchange.listener();
exchange.receive();
// Simulate an idle timeout
connection.idleTimeout();
@ -193,8 +182,8 @@ public class HttpReceiverTest
"HTTP/1.1 200 OK\r\n" +
"Content-length: A\r\n" +
"\r\n");
BlockingResponseListener listener = new BlockingResponseListener();
HttpExchange exchange = newExchange(listener);
HttpExchange exchange = newExchange();
BlockingResponseListener listener = (BlockingResponseListener)exchange.listener();
exchange.receive();
try
@ -224,8 +213,8 @@ public class HttpReceiverTest
"Content-Length: " + gzip.length + "\r\n" +
"Content-Encoding: gzip\r\n" +
"\r\n");
BlockingResponseListener listener = new BlockingResponseListener();
HttpExchange exchange = newExchange(listener);
HttpExchange exchange = newExchange();
BlockingResponseListener listener = (BlockingResponseListener)exchange.listener();
exchange.receive();
endPoint.reset();

View File

@ -68,7 +68,7 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
@Override
public void onQueued(Request request)
{
request.abort();
request.abort(null);
}
@Override
@ -104,7 +104,7 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
@Override
public void onBegin(Request request)
{
if (request.abort())
if (request.abort(null))
aborted.countDown();
}
@ -144,7 +144,7 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
@Override
public void onHeaders(Request request)
{
if (request.abort())
if (request.abort(null))
aborted.countDown();
}
})
@ -196,7 +196,7 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
@Override
public void onHeaders(Request request)
{
request.abort();
request.abort(null);
}
})
.content(new ByteBufferContentProvider(ByteBuffer.wrap(new byte[]{0}), ByteBuffer.wrap(new byte[]{1}))
@ -270,7 +270,7 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
TimeUnit.MILLISECONDS.sleep(delay);
request.abort();
request.abort(null);
try
{
@ -304,7 +304,7 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
{
// Abort the request after the 3xx response but before issuing the next request
if (!result.isFailed())
result.getRequest().abort();
result.getRequest().abort(null);
super.onComplete(result);
}
});

View File

@ -60,7 +60,7 @@ public class HttpResponseAbortTest extends AbstractHttpClientServerTest
@Override
public void onBegin(Response response)
{
response.abort();
response.abort(null);
}
@Override
@ -86,7 +86,7 @@ public class HttpResponseAbortTest extends AbstractHttpClientServerTest
@Override
public void onHeaders(Response response)
{
response.abort();
response.abort(null);
}
@Override
@ -131,7 +131,7 @@ public class HttpResponseAbortTest extends AbstractHttpClientServerTest
@Override
public void onContent(Response response, ByteBuffer content)
{
response.abort();
response.abort(null);
}
@Override

View File

@ -125,7 +125,7 @@ public class Usage
try (Connection connection = client.getDestination("http", "localhost", 8080).newConnection().get(5, TimeUnit.SECONDS))
{
Request request = client.newRequest("localhost", 8080);
BlockingResponseListener listener = new BlockingResponseListener();
BlockingResponseListener listener = new BlockingResponseListener(request);
connection.send(request, listener);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
@ -191,7 +191,7 @@ public class Usage
}
else
{
response.abort();
response.abort(null);
}
}