Jetty9 - Fourth take at HTTP client implementation.

This commit is contained in:
Simone Bordet 2012-09-05 13:33:38 +02:00
parent 2b5ec13003
commit 2715f64441
14 changed files with 335 additions and 125 deletions

View File

@ -36,8 +36,6 @@ import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.MappedByteBufferPool;
@ -147,7 +145,9 @@ public class HttpClient extends AggregateLifeCycle
@Override
protected void doStop() throws Exception
{
LOG.debug("Stopping {}", this);
super.doStop();
LOG.info("Stopped {}", this);
}
public long getIdleTimeout()
@ -195,26 +195,12 @@ public class HttpClient extends AggregateLifeCycle
public Request newRequest(URI uri)
{
HttpRequest request = new HttpRequest(this, uri);
normalizeRequest(request);
return request;
return new HttpRequest(this, uri);
}
protected Request newRequest(long id, URI uri)
{
HttpRequest request = new HttpRequest(this, id, uri);
normalizeRequest(request);
return request;
}
protected void normalizeRequest(Request request)
{
// TODO: Add decoder, cookies, agent, default headers, etc.
request.method(HttpMethod.GET)
.version(HttpVersion.HTTP_1_1)
.agent(getUserAgent())
.idleTimeout(getIdleTimeout())
.followRedirects(isFollowRedirects());
return new HttpRequest(this, id, uri);
}
private String address(String scheme, String host, int port)
@ -322,7 +308,7 @@ public class HttpClient extends AggregateLifeCycle
this.responseBufferSize = responseBufferSize;
}
protected void newConnection(Destination destination, Callback<Connection> callback)
protected void newConnection(HttpDestination destination, Callback<Connection> callback)
{
SocketChannel channel = null;
try
@ -407,7 +393,7 @@ public class HttpClient extends AggregateLifeCycle
public org.eclipse.jetty.io.Connection newConnection(SocketChannel channel, EndPoint endPoint, Object attachment) throws IOException
{
ConnectionCallback callback = (ConnectionCallback)attachment;
Destination destination = callback.destination;
HttpDestination destination = callback.destination;
SslContextFactory sslContextFactory = getSslContextFactory();
if ("https".equals(destination.scheme()))
@ -426,7 +412,7 @@ public class HttpClient extends AggregateLifeCycle
SslConnection sslConnection = new SslConnection(getByteBufferPool(), getExecutor(), endPoint, engine);
EndPoint appEndPoint = sslConnection.getDecryptedEndPoint();
HttpConnection connection = new HttpConnection(HttpClient.this, appEndPoint);
HttpConnection connection = new HttpConnection(HttpClient.this, appEndPoint, destination);
appEndPoint.setConnection(connection);
callback.callback.completed(connection);
connection.onOpen();
@ -436,7 +422,7 @@ public class HttpClient extends AggregateLifeCycle
}
else
{
HttpConnection connection = new HttpConnection(HttpClient.this, endPoint);
HttpConnection connection = new HttpConnection(HttpClient.this, endPoint, destination);
callback.callback.completed(connection);
return connection;
}
@ -451,10 +437,10 @@ public class HttpClient extends AggregateLifeCycle
private class ConnectionCallback extends FutureCallback<Connection>
{
private final Destination destination;
private final HttpDestination destination;
private final Callback<Connection> callback;
private ConnectionCallback(Destination destination, Callback<Connection> callback)
private ConnectionCallback(HttpDestination destination, Callback<Connection> callback)
{
this.destination = destination;
this.callback = callback;

View File

@ -8,6 +8,7 @@ import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.EndPoint;
@ -20,13 +21,15 @@ public class HttpConnection extends AbstractConnection implements Connection
private final AtomicReference<HttpExchange> exchange = new AtomicReference<>();
private final HttpClient client;
private final HttpDestination destination;
private final HttpSender sender;
private final HttpReceiver receiver;
public HttpConnection(HttpClient client, EndPoint endPoint)
public HttpConnection(HttpClient client, EndPoint endPoint, HttpDestination destination)
{
super(endPoint, client.getExecutor());
this.client = client;
this.destination = destination;
this.sender = new HttpSender(this);
this.receiver = new HttpReceiver(this);
}
@ -49,6 +52,10 @@ public class HttpConnection extends AbstractConnection implements Connection
HttpExchange exchange = this.exchange.get();
if (exchange != null)
exchange.idleTimeout();
// We will be closing the connection, so remove it
destination.remove(this);
return true;
}
@ -72,6 +79,21 @@ public class HttpConnection extends AbstractConnection implements Connection
private void normalizeRequest(Request request)
{
if (request.method() == null)
request.method(HttpMethod.GET);
if (request.version() == null)
request.version(HttpVersion.HTTP_1_1);
if (request.agent() == null)
request.agent(client.getUserAgent());
if (request.idleTimeout() <= 0)
request.idleTimeout(client.getIdleTimeout());
// TODO: follow redirects
// TODO: cookies
HttpVersion version = request.version();
HttpFields headers = request.headers();
ContentProvider content = request.content();
@ -103,7 +125,13 @@ public class HttpConnection extends AbstractConnection implements Connection
if (version.getVersion() > 10)
{
if (!headers.containsKey(HttpHeader.HOST.asString()))
headers.put(HttpHeader.HOST, request.host() + ":" + request.port());
{
String value = request.host();
int port = request.port();
if (port > 0)
value += ":" + port;
headers.put(HttpHeader.HOST, value);
}
}
}
@ -116,4 +144,14 @@ public class HttpConnection extends AbstractConnection implements Connection
else
throw new IllegalStateException();
}
@Override
public String toString()
{
return String.format("%s@%x(l:%s <-> r:%s)",
HttpConnection.class.getSimpleName(),
hashCode(),
getEndPoint().getLocalAddress(),
getEndPoint().getRemoteAddress());
}
}

View File

@ -15,6 +15,7 @@ package org.eclipse.jetty.client;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
@ -38,8 +39,8 @@ public class HttpDestination implements Destination
private final String host;
private final int port;
private final Queue<RequestPair> requests;
private final Queue<Connection> idleConnections;
private final Queue<Connection> activeConnections;
private final BlockingQueue<Connection> idleConnections;
private final BlockingQueue<Connection> activeConnections;
public HttpDestination(HttpClient client, String scheme, String host, int port)
{
@ -52,6 +53,11 @@ public class HttpDestination implements Destination
this.activeConnections = new ArrayBlockingQueue<>(client.getMaxConnectionsPerAddress());
}
protected BlockingQueue<Connection> idleConnections()
{
return idleConnections;
}
@Override
public String scheme()
{
@ -93,7 +99,9 @@ public class HttpDestination implements Destination
{
LOG.debug("Queued {}", request);
notifyRequestQueued(request.listener(), request);
ensureConnection(); // TODO: improve and test this
Connection connection = acquire();
if (connection != null)
process(connection);
}
}
else
@ -120,8 +128,24 @@ public class HttpDestination implements Destination
}
}
private void ensureConnection()
public Future<Connection> newConnection()
{
FutureCallback<Connection> result = new FutureCallback<>();
newConnection(result);
return result;
}
protected void newConnection(Callback<Connection> callback)
{
client.newConnection(this, callback);
}
protected Connection acquire()
{
Connection result = idleConnections.poll();
if (result != null)
return result;
final int maxConnections = client.getMaxConnectionsPerAddress();
while (true)
{
@ -131,7 +155,8 @@ public class HttpDestination implements Destination
if (next > maxConnections)
{
LOG.debug("Max connections reached {}: {}", this, current);
break;
// Try again the idle connections
return idleConnections.poll();
}
if (connectionCount.compareAndSet(current, next))
@ -141,7 +166,7 @@ public class HttpDestination implements Destination
@Override
public void completed(Connection connection)
{
LOG.debug("Created connection {}/{} for {}", next, maxConnections, this);
LOG.debug("Created connection {}/{} {} for {}", next, maxConnections, connection, this);
process(connection);
}
@ -151,23 +176,12 @@ public class HttpDestination implements Destination
// TODO: what here ?
}
});
break;
// Try again the idle connections
return idleConnections.poll();
}
}
}
public Future<Connection> newConnection()
{
FutureCallback<Connection> result = new FutureCallback<>();
newConnection(result);
return result;
}
private void newConnection(Callback<Connection> callback)
{
client.newConnection(this, callback);
}
/**
* <p>Processes a new connection making it idle or active depending on whether requests are waiting to be sent.</p>
* <p>A new connection is created when a request needs to be executed; it is possible that the request that
@ -182,6 +196,7 @@ public class HttpDestination implements Destination
final RequestPair requestPair = requests.poll();
if (requestPair == null)
{
LOG.debug("Connection {} idle", connection);
idleConnections.offer(connection);
}
else
@ -198,20 +213,18 @@ public class HttpDestination implements Destination
}
}
// TODO: 1. We must do queuing of requests in any case, because we cannot do blocking connect
// TODO: 2. We must be non-blocking connect, therefore we need to queue
// Connections should compete for the queue of requests in separate threads
// that poses a problem of thread pool size: if < maxConnections we're starving
//
// conn1 is executed, takes on the queue => I need at least one thread per destination
// we need to queue the request, pick an idle connection, then execute { conn.send(request, listener) }
// if I create manually the connection, then I call send(request, listener)
// Other ways ?
public void release(Connection connection)
{
activeConnections.remove(connection);
idleConnections.offer(connection);
}
public void remove(Connection connection)
{
connectionCount.decrementAndGet();
activeConnections.remove(connection);
idleConnections.remove(connection);
}
@Override
public String toString()

View File

@ -57,7 +57,7 @@ public class HttpExchange
receiver.receive(this);
}
public void requestDone()
public void requestDone(boolean success)
{
// TODO
}

View File

@ -24,6 +24,7 @@ import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.util.FutureCallback;
@ -147,13 +148,13 @@ public class HttpRequest implements Request
@Override
public String agent()
{
return agent;
return headers.get(HttpHeader.USER_AGENT);
}
@Override
public Request agent(String userAgent)
{
this.agent = userAgent;
headers.put(HttpHeader.USER_AGENT, userAgent);
return this;
}

View File

@ -30,7 +30,8 @@ public class HttpSender
private Iterator<ByteBuffer> contentChunks;
private ByteBuffer header;
private ByteBuffer chunk;
private boolean requestHeadersComplete;
private boolean headersComplete;
private boolean failed;
public HttpSender(HttpConnection connection)
{
@ -112,9 +113,9 @@ public class HttpSender
if (callback.completed())
{
if (!requestHeadersComplete)
if (!headersComplete)
{
requestHeadersComplete = true;
headersComplete = true;
notifyRequestHeadersComplete(request);
}
releaseBuffers();
@ -134,7 +135,7 @@ public class HttpSender
}
case DONE:
{
if (generator.isEnd())
if (generator.isEnd() && !failed)
success();
return;
}
@ -158,10 +159,15 @@ public class HttpSender
protected void success()
{
HttpExchange exchange = this.exchange.getAndSet(null);
exchange.requestDone();
// Cleanup first
generator.reset();
requestHeadersComplete = false;
headersComplete = false;
// Notify after
HttpExchange exchange = this.exchange.getAndSet(null);
LOG.debug("{} succeeded", exchange.request());
exchange.requestDone(true);
// It is important to notify *after* we reset because
// the notification may trigger another request/response
notifyRequestSuccess(exchange.request());
@ -169,13 +175,18 @@ public class HttpSender
protected void fail(Throwable failure)
{
// Cleanup first
BufferUtil.clear(header);
BufferUtil.clear(chunk);
releaseBuffers();
connection.getEndPoint().shutdownOutput();
generator.abort();
failed = true;
// Notify after
HttpExchange exchange = this.exchange.getAndSet(null);
exchange.requestDone();
exchange.requestDone(false);
notifyRequestFailure(exchange.request(), failure);
notifyResponseFailure(exchange.listener(), failure);
}

View File

@ -6,7 +6,7 @@ import org.eclipse.jetty.server.SelectChannelConnector;
import org.eclipse.jetty.server.Server;
import org.junit.After;
public class AbstractHttpClientTest
public class AbstractHttpClientServerTest
{
protected Server server;
protected HttpClient client;

View File

@ -0,0 +1,18 @@
package org.eclipse.jetty.client;
import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
public class EmptyHandler extends AbstractHandler
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
}
}

View File

@ -28,19 +28,12 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.Assert;
import org.junit.Test;
public class HttpClientTest extends AbstractHttpClientTest
public class HttpClientTest extends AbstractHttpClientServerTest
{
@Test
public void test_GET_NoResponseContent() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
}
});
start(new EmptyHandler());
Response response = client.GET("http://localhost:" + connector.getLocalPort()).get(5, TimeUnit.SECONDS);
@ -70,7 +63,12 @@ public class HttpClientTest extends AbstractHttpClientTest
Assert.assertArrayEquals(data, content);
}
@Test
public void test_DestinationCount() throws Exception
{
start(new EmptyHandler());
client.GET("http://localhost:" + connector.getLocalPort()).get(5, TimeUnit.SECONDS);
List<Destination> destinations = client.getDestinations();
Assert.assertNotNull(destinations);

View File

@ -0,0 +1,143 @@
package org.eclipse.jetty.client;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class HttpDestinationTest extends AbstractHttpClientServerTest
{
@Before
public void init() throws Exception
{
start(new EmptyHandler());
}
@Test
public void test_FirstAcquire_WithEmptyQueue() throws Exception
{
HttpDestination destination = new HttpDestination(client, "http", "localhost", connector.getLocalPort());
Connection connection = destination.acquire();
// There are no available existing connections, so acquire() returns null
Assert.assertNull(connection);
// There are no queued requests, so the newly created connection will be idle
connection = destination.idleConnections().poll(5, TimeUnit.SECONDS);
Assert.assertNotNull(connection);
}
@Test
public void test_SecondAcquire_AfterFirstAcquire_WithEmptyQueue_ReturnsSameConnection() throws Exception
{
HttpDestination destination = new HttpDestination(client, "http", "localhost", connector.getLocalPort());
Connection connection1 = destination.acquire();
// There are no available existing connections, so acquire() returns null
Assert.assertNull(connection1);
// There are no queued requests, so the newly created connection will be idle
long start = System.nanoTime();
while (connection1 == null && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5)
{
connection1 = destination.idleConnections().peek();
TimeUnit.MILLISECONDS.sleep(50);
}
Assert.assertNotNull(connection1);
Connection connection2 = destination.acquire();
Assert.assertSame(connection1, connection2);
}
@Test
public void test_SecondAcquire_ConcurrentWithFirstAcquire_WithEmptyQueue_CreatesTwoConnections() throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
HttpDestination destination = new HttpDestination(client, "http", "localhost", connector.getLocalPort())
{
@Override
protected void process(Connection connection)
{
try
{
latch.await(5, TimeUnit.SECONDS);
super.process(connection);
}
catch (InterruptedException x)
{
x.printStackTrace();
}
}
};
Connection connection1 = destination.acquire();
// There are no available existing connections, so acquire() returns null
Assert.assertNull(connection1);
Connection connection2 = destination.acquire();
Assert.assertNull(connection2);
latch.countDown();
// There must be 2 idle connections
Connection connection = destination.idleConnections().poll(5, TimeUnit.SECONDS);
Assert.assertNotNull(connection);
connection = destination.idleConnections().poll(5, TimeUnit.SECONDS);
Assert.assertNotNull(connection);
}
@Test
public void test_Acquire_Release_Acquire_ReturnsSameConnection() throws Exception
{
HttpDestination destination = new HttpDestination(client, "http", "localhost", connector.getLocalPort());
Connection connection1 = destination.acquire();
// There are no available existing connections, so acquire() returns null
Assert.assertNull(connection1);
// There are no queued requests, so the newly created connection will be idle
long start = System.nanoTime();
while (connection1 == null && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5)
{
connection1 = destination.idleConnections().peek();
TimeUnit.MILLISECONDS.sleep(50);
}
Assert.assertNotNull(connection1);
destination.release(connection1);
Connection connection2 = destination.acquire();
Assert.assertSame(connection1, connection2);
}
@Slow
@Test
public void test_IdleConnection_IdleTimeout() throws Exception
{
long idleTimeout = 1000;
client.setIdleTimeout(idleTimeout);
HttpDestination destination = new HttpDestination(client, "http", "localhost", connector.getLocalPort());
destination.acquire();
// There are no queued requests, so the newly created connection will be idle
Connection connection1 = null;
long start = System.nanoTime();
while (connection1 == null && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5)
{
connection1 = destination.idleConnections().peek();
TimeUnit.MILLISECONDS.sleep(50);
}
Assert.assertNotNull(connection1);
TimeUnit.MILLISECONDS.sleep(2 * idleTimeout);
connection1 = destination.idleConnections().poll();
Assert.assertNull(connection1);
}
}

View File

@ -28,7 +28,9 @@ public class HttpReceiverTest
// "\r\n");
// final AtomicReference<Response> responseRef = new AtomicReference<>();
// final CountDownLatch latch = new CountDownLatch(1);
// HttpReceiver receiver = new HttpReceiver(connection, null, new Response.Listener.Adapter()
// HttpReceiver receiver = new HttpReceiver(connection);
// HttpExchange exchange = new HttpExchange();
// , null, new Response.Listener.Adapter()
// {
// @Override
// public void onSuccess(Response response)

View File

@ -8,6 +8,7 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -34,11 +35,11 @@ public class HttpSenderTest
public void test_Send_NoRequestContent() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
HttpConnection connection = new HttpConnection(client, endPoint);
Request httpRequest = new HttpRequest(client, URI.create("http://localhost/"));
HttpConnection connection = new HttpConnection(client, endPoint, null);
Request request = client.newRequest(URI.create("http://localhost/"));
final CountDownLatch headersLatch = new CountDownLatch(1);
final CountDownLatch successLatch = new CountDownLatch(1);
httpRequest.listener(new Request.Listener.Adapter()
request.listener(new Request.Listener.Adapter()
{
@Override
public void onHeaders(Request request)
@ -52,7 +53,7 @@ public class HttpSenderTest
successLatch.countDown();
}
});
connection.send(httpRequest, null);
connection.send(request, null);
String requestString = endPoint.takeOutputString();
Assert.assertTrue(requestString.startsWith("GET "));
@ -61,31 +62,29 @@ public class HttpSenderTest
Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS));
}
@Slow
@Test
public void test_Send_NoRequestContent_IncompleteFlush() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint("", 16);
HttpConnection connection = new HttpConnection(client, endPoint);
Request httpRequest = new HttpRequest(client, URI.create("http://localhost/"));
final CountDownLatch headersLatch = new CountDownLatch(1);
httpRequest.listener(new Request.Listener.Adapter()
{
@Override
public void onHeaders(Request request)
{
headersLatch.countDown();
}
});
connection.send(httpRequest, null);
HttpConnection connection = new HttpConnection(client, endPoint, null);
Request request = client.newRequest(URI.create("http://localhost/"));
connection.send(request, null);
// This take will free space in the buffer and allow for the write to complete
StringBuilder request = new StringBuilder(endPoint.takeOutputString());
StringBuilder builder = new StringBuilder(endPoint.takeOutputString());
// Wait for the write to complete
Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS));
request.append(endPoint.takeOutputString());
TimeUnit.SECONDS.sleep(1);
String requestString = request.toString();
String chunk = endPoint.takeOutputString();
while (chunk.length() > 0)
{
builder.append(chunk);
chunk = endPoint.takeOutputString();
}
String requestString = builder.toString();
Assert.assertTrue(requestString.startsWith("GET "));
Assert.assertTrue(requestString.endsWith("\r\n\r\n"));
}
@ -96,10 +95,10 @@ public class HttpSenderTest
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
// Shutdown output to trigger the exception on write
endPoint.shutdownOutput();
HttpConnection connection = new HttpConnection(client, endPoint);
Request httpRequest = new HttpRequest(client, URI.create("http://localhost/"));
HttpConnection connection = new HttpConnection(client, endPoint, null);
Request request = client.newRequest(URI.create("http://localhost/"));
final CountDownLatch failureLatch = new CountDownLatch(2);
httpRequest.listener(new Request.Listener.Adapter()
request.listener(new Request.Listener.Adapter()
{
@Override
public void onFailure(Request request, Throwable x)
@ -107,7 +106,7 @@ public class HttpSenderTest
failureLatch.countDown();
}
});
connection.send(httpRequest, new Response.Listener.Adapter()
connection.send(request, new Response.Listener.Adapter()
{
@Override
public void onFailure(Response response, Throwable failure)
@ -123,10 +122,10 @@ public class HttpSenderTest
public void test_Send_NoRequestContent_IncompleteFlush_Exception() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint("", 16);
HttpConnection connection = new HttpConnection(client, endPoint);
Request httpRequest = new HttpRequest(client, URI.create("http://localhost/"));
HttpConnection connection = new HttpConnection(client, endPoint, null);
Request request = client.newRequest(URI.create("http://localhost/"));
final CountDownLatch failureLatch = new CountDownLatch(2);
httpRequest.listener(new Request.Listener.Adapter()
request.listener(new Request.Listener.Adapter()
{
@Override
public void onFailure(Request request, Throwable x)
@ -134,7 +133,7 @@ public class HttpSenderTest
failureLatch.countDown();
}
});
connection.send(httpRequest, new Response.Listener.Adapter()
connection.send(request, new Response.Listener.Adapter()
{
@Override
public void onFailure(Response response, Throwable failure)
@ -156,13 +155,13 @@ public class HttpSenderTest
public void test_Send_SmallRequestContent_InOneBuffer() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
HttpConnection connection = new HttpConnection(client, endPoint);
Request httpRequest = new HttpRequest(client, URI.create("http://localhost/"));
HttpConnection connection = new HttpConnection(client, endPoint, null);
Request request = client.newRequest(URI.create("http://localhost/"));
String content = "abcdef";
httpRequest.content(new ByteBufferContentProvider(ByteBuffer.wrap(content.getBytes("UTF-8"))));
request.content(new ByteBufferContentProvider(ByteBuffer.wrap(content.getBytes("UTF-8"))));
final CountDownLatch headersLatch = new CountDownLatch(1);
final CountDownLatch successLatch = new CountDownLatch(1);
httpRequest.listener(new Request.Listener.Adapter()
request.listener(new Request.Listener.Adapter()
{
@Override
public void onHeaders(Request request)
@ -176,7 +175,7 @@ public class HttpSenderTest
successLatch.countDown();
}
});
connection.send(httpRequest, null);
connection.send(request, null);
String requestString = endPoint.takeOutputString();
Assert.assertTrue(requestString.startsWith("GET "));
@ -189,14 +188,14 @@ public class HttpSenderTest
public void test_Send_SmallRequestContent_InTwoBuffers() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
HttpConnection connection = new HttpConnection(client, endPoint);
Request httpRequest = new HttpRequest(client, URI.create("http://localhost/"));
HttpConnection connection = new HttpConnection(client, endPoint, null);
Request request = client.newRequest(URI.create("http://localhost/"));
String content1 = "0123456789";
String content2 = "abcdef";
httpRequest.content(new ByteBufferContentProvider(ByteBuffer.wrap(content1.getBytes("UTF-8")), ByteBuffer.wrap(content2.getBytes("UTF-8"))));
request.content(new ByteBufferContentProvider(ByteBuffer.wrap(content1.getBytes("UTF-8")), ByteBuffer.wrap(content2.getBytes("UTF-8"))));
final CountDownLatch headersLatch = new CountDownLatch(1);
final CountDownLatch successLatch = new CountDownLatch(1);
httpRequest.listener(new Request.Listener.Adapter()
request.listener(new Request.Listener.Adapter()
{
@Override
public void onHeaders(Request request)
@ -210,7 +209,7 @@ public class HttpSenderTest
successLatch.countDown();
}
});
connection.send(httpRequest, null);
connection.send(request, null);
String requestString = endPoint.takeOutputString();
Assert.assertTrue(requestString.startsWith("GET "));
@ -223,11 +222,11 @@ public class HttpSenderTest
public void test_Send_SmallRequestContent_Chunked_InTwoChunks() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
HttpConnection connection = new HttpConnection(client, endPoint);
Request httpRequest = new HttpRequest(client, URI.create("http://localhost/"));
HttpConnection connection = new HttpConnection(client, endPoint, null);
Request request = client.newRequest(URI.create("http://localhost/"));
String content1 = "0123456789";
String content2 = "ABCDEF";
httpRequest.content(new ByteBufferContentProvider(ByteBuffer.wrap(content1.getBytes("UTF-8")), ByteBuffer.wrap(content2.getBytes("UTF-8")))
request.content(new ByteBufferContentProvider(ByteBuffer.wrap(content1.getBytes("UTF-8")), ByteBuffer.wrap(content2.getBytes("UTF-8")))
{
@Override
public long length()
@ -237,7 +236,7 @@ public class HttpSenderTest
});
final CountDownLatch headersLatch = new CountDownLatch(1);
final CountDownLatch successLatch = new CountDownLatch(1);
httpRequest.listener(new Request.Listener.Adapter()
request.listener(new Request.Listener.Adapter()
{
@Override
public void onHeaders(Request request)
@ -251,7 +250,7 @@ public class HttpSenderTest
successLatch.countDown();
}
});
connection.send(httpRequest, null);
connection.send(request, null);
String requestString = endPoint.takeOutputString();
Assert.assertTrue(requestString.startsWith("GET "));
@ -262,5 +261,4 @@ public class HttpSenderTest
Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -14,7 +14,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class RedirectionTest extends AbstractHttpClientTest
public class RedirectionTest extends AbstractHttpClientServerTest
{
@Before
public void init() throws Exception

View File

@ -286,9 +286,11 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
@Override
protected void doStop() throws Exception
{
LOG.debug("Stopping {}", this);
Stop stop = new Stop();
submit(stop);
stop.await(getStopTimeout());
LOG.debug("Stopped {}", this);
}
/**