Merged branch 'jetty-9.2.x' into 'master'.

This commit is contained in:
Simone Bordet 2015-02-06 21:45:22 +01:00
commit 99da9ce98e
5 changed files with 105 additions and 10 deletions

View File

@ -54,6 +54,11 @@ public class ConnectionPool implements Closeable, Dumpable
this.activeConnections = new BlockingArrayQueue<>(maxConnections);
}
public int getConnectionCount()
{
return connectionCount.get();
}
public BlockingQueue<Connection> getIdleConnections()
{
return idleConnections;
@ -76,7 +81,7 @@ public class ConnectionPool implements Closeable, Dumpable
{
while (true)
{
int current = connectionCount.get();
int current = getConnectionCount();
final int next = current + 1;
if (next > maxConnections)

View File

@ -59,7 +59,7 @@ public abstract class HttpDestination implements Destination, Closeable, Dumpabl
this.client = client;
this.origin = origin;
this.exchanges = new BlockingArrayQueue<>(client.getMaxRequestsQueuedPerDestination());
this.exchanges = newExchangeQueue(client);
this.requestNotifier = new RequestNotifier(client);
this.responseNotifier = new ResponseNotifier();
@ -84,6 +84,11 @@ public abstract class HttpDestination implements Destination, Closeable, Dumpabl
hostField = new HttpField(HttpHeader.HOST, host);
}
protected Queue<HttpExchange> newExchangeQueue(HttpClient client)
{
return new BlockingArrayQueue<>(client.getMaxRequestsQueuedPerDestination());
}
protected ClientConnectionFactory newSslClientConnectionFactory(ClientConnectionFactory connectionFactory)
{
return new SslClientConnectionFactory(client.getSslContextFactory(), client.getByteBufferPool(), client.getExecutor(), connectionFactory);
@ -168,7 +173,7 @@ public abstract class HttpDestination implements Destination, Closeable, Dumpabl
if (client.isRunning())
{
if (exchanges.offer(exchange))
if (enqueue(exchanges, exchange))
{
if (!client.isRunning() && exchanges.remove(exchange))
{
@ -195,6 +200,11 @@ public abstract class HttpDestination implements Destination, Closeable, Dumpabl
}
}
protected boolean enqueue(Queue<HttpExchange> queue, HttpExchange exchange)
{
return queue.offer(exchange);
}
protected abstract void send();
public void newConnection(Promise<Connection> promise)

View File

@ -43,7 +43,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
@ -579,7 +579,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
Files.delete(file);
}
@Test
public void test_ExchangeIsComplete_WhenRequestFailsMidway_WithResponse() throws Exception
{
@ -1363,6 +1363,55 @@ public class HttpClientTest extends AbstractHttpClientServerTest
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testCompleteNotInvokedUntilContentConsumed() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
ServletOutputStream output = response.getOutputStream();
output.write(new byte[1024]);
}
});
final AtomicReference<Callback> callbackRef = new AtomicReference<>();
final CountDownLatch contentLatch = new CountDownLatch(1);
final CountDownLatch completeLatch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(new Response.Listener.Adapter()
{
@Override
public void onContent(Response response, ByteBuffer content, Callback callback)
{
// Do not notify the callback yet.
callbackRef.set(callback);
contentLatch.countDown();
}
@Override
public void onComplete(Result result)
{
if (result.isSucceeded())
completeLatch.countDown();
}
});
Assert.assertTrue(contentLatch.await(5, TimeUnit.SECONDS));
// Make sure the complete event is not emitted.
Assert.assertFalse(completeLatch.await(1, TimeUnit.SECONDS));
// Consume the content.
callbackRef.get().succeeded();
// Now the complete event is emitted.
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
}
public static abstract class RetryListener implements Response.CompleteListener
{

View File

@ -252,16 +252,13 @@ public class ConnectHandler extends HandlerWrapper
SocketChannel channel = SocketChannel.open();
channel.socket().setTcpNoDelay(true);
channel.configureBlocking(false);
InetSocketAddress address = new InetSocketAddress(host, port);
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
if (LOG.isDebugEnabled())
LOG.debug("Connecting to {}", address);
HttpTransport transport = baseRequest.getHttpChannel().getHttpTransport();
// TODO Handle CONNECT over HTTP2!
if (!(transport instanceof HttpConnection))
{
if (LOG.isDebugEnabled())
@ -269,7 +266,10 @@ public class ConnectHandler extends HandlerWrapper
sendConnectResponse(request, response, HttpServletResponse.SC_FORBIDDEN);
return;
}
InetSocketAddress address = newConnectAddress(host, port);
if (LOG.isDebugEnabled())
LOG.debug("Connecting to {}", address);
ConnectContext connectContext = new ConnectContext(request, response, asyncContext, (HttpConnection)transport);
if (channel.connect(address))
selector.accept(channel, connectContext);
@ -282,6 +282,17 @@ public class ConnectHandler extends HandlerWrapper
}
}
/* ------------------------------------------------------------ */
/** Create the address the connect channel will connect to.
* @param host The host from the connect request
* @param port The port from the connect request
* @return The InetSocketAddress to connect to.
*/
protected InetSocketAddress newConnectAddress(String host, int port)
{
return new InetSocketAddress(host, port);
}
protected void onConnectSuccess(ConnectContext connectContext, UpstreamConnection upstreamConnection)
{
HttpConnection httpConnection = connectContext.getHttpConnection();

View File

@ -0,0 +1,20 @@
Bag Attributes
friendlyName: jetty
localKeyID: 54 69 6D 65 20 31 34 32 33 31 39 38 30 39 33 31 31 35
Key Attributes: <No Attributes>
-----BEGIN PRIVATE KEY-----
MIICdgIBADANBgkqhkiG9w0BAQEFAASCAmAwggJcAgEAAoGBAIPh4Q0t4xklXTzX
N2VAb47r5n7idAupp4CTNEhhT6lS70iA+A8i4+0lSEHWAogvd9jl3H7SvScr30QM
4ieC0JCGSOwGc8f+yqKrO56PPd5OuqW380BJ0r74jJczU9CcsuavHD7e6mRLUnmj
xM20NSxrcicMiPUHY1mJZtN9swtxAgMBAAECgYADS9P6Jll0uXBZIu/pgfDH27GJ
HlPULstW9VbrMDNzgfUlFMQebLrRpIrnyleJ29Xc//HA4beEkR4lb0T/w88+pEkt
7fhYeqRLPIfpDOgzloynnsoPcd8f/PypbimQrNLmBiG1178nVcy4Yoh5lYVIJwtU
3VriqDlvAfTLrrx8AQJBAMLWuh27Hb8xs3LRg4UD7hcv8tJejstm08Y+czRz7cO0
RENa3aDjGFSegc+IUfdez7BP8uDw+PwE+jybmTvaliECQQCtR/anCY1WS28/bKvy
lmIwoI15eraBdVFkN0Hfxh+9PfR3rMD5uyvukT5GgTtY/XxADyafSTaipDJiZHJI
EitRAkBjeCBYYVjUbVlBuvi8Bb+dktsSzzdzXDGtueAy3SR7jyJyiIcxRf775Fg9
TUkbUwoQ5yAF+sACWcAvBPj796JBAkAEZEeHEkHnxv+pztpIyrDwZJFRW9/WRh/q
90+PGVlilXhltBYr/idt43Z9mPblGX+VrAyhitx8oMa6IauX0gYRAkEAgnyVeXrD
jDLUZRA3P8Gu27k1k6GjbTYiUz3HKCz2/6+MZ2MK2qqwafgqocji029Q6dHdPD7a
4QnRlvraUnyQLA==
-----END PRIVATE KEY-----