Merged branch 'jetty-9.2.x' into 'master'.

This commit is contained in:
Simone Bordet 2015-02-15 20:51:49 +01:00
commit 28b9a0927c
12 changed files with 377 additions and 63 deletions

View File

@ -76,6 +76,8 @@ public abstract class HttpChannel
public abstract boolean abort(Throwable cause);
public abstract boolean abortResponse(Throwable cause);
public void exchangeTerminated(Result result)
{
disassociate();

View File

@ -198,7 +198,7 @@ public class HttpExchange
boolean aborted = channel.abort(cause);
if (LOG.isDebugEnabled())
LOG.debug("Aborted while active ({}) {}: {}", aborted, this, cause);
LOG.debug("Aborted ({}) while active {}: {}", aborted, this, cause);
return aborted;
}
}

View File

@ -197,7 +197,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
notifier.notifyBegin(request);
if (!updateRequestState(RequestState.TRANSIENT, RequestState.BEGIN))
terminateRequest(getHttpExchange(), failure, false);
terminateRequest(getHttpExchange(), failure);
return true;
}
@ -210,7 +210,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
notifier.notifyHeaders(request);
if (!updateRequestState(RequestState.TRANSIENT, RequestState.HEADERS))
terminateRequest(getHttpExchange(), failure, false);
terminateRequest(getHttpExchange(), failure);
return true;
}
@ -223,7 +223,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
notifier.notifyCommit(request);
if (!updateRequestState(RequestState.TRANSIENT, RequestState.COMMIT))
terminateRequest(getHttpExchange(), failure, true);
terminateRequest(getHttpExchange(), failure);
return true;
}
@ -242,7 +242,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
notifier.notifyContent(request, content);
if (!updateRequestState(RequestState.TRANSIENT_CONTENT, RequestState.CONTENT))
terminateRequest(getHttpExchange(), failure, true);
terminateRequest(getHttpExchange(), failure);
return true;
}
default:
@ -281,7 +281,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
HttpDestination destination = getHttpChannel().getHttpDestination();
destination.getRequestNotifier().notifySuccess(exchange.getRequest());
terminateRequest(exchange, null, true, result);
terminateRequest(exchange, null, result);
return true;
}
@ -333,7 +333,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
if (fail)
{
terminateRequest(exchange, failure, !isBeforeCommit(current), result);
terminateRequest(exchange, failure, result);
}
else
{
@ -344,34 +344,32 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
return true;
}
private void terminateRequest(HttpExchange exchange, Throwable failure, boolean committed)
private void terminateRequest(HttpExchange exchange, Throwable failure)
{
if (exchange != null)
{
Result result = exchange.terminateRequest(failure);
terminateRequest(exchange, failure, committed, result);
terminateRequest(exchange, failure, result);
}
}
private void terminateRequest(HttpExchange exchange, Throwable failure, boolean committed, Result result)
private void terminateRequest(HttpExchange exchange, Throwable failure, Result result)
{
Request request = exchange.getRequest();
if (LOG.isDebugEnabled())
LOG.debug("Terminating request {}", request);
if (failure != null && !committed && result == null && request.getAbortCause() == null)
if (result == null)
{
// Complete the response from here
if (exchange.responseComplete())
if (failure != null)
{
result = exchange.terminateResponse(failure);
if (LOG.isDebugEnabled())
LOG.debug("Failed response from request {}", exchange);
LOG.debug("Response failure from request {}", exchange);
getHttpChannel().abortResponse(failure);
}
}
if (result != null)
else
{
HttpDestination destination = getHttpChannel().getHttpDestination();
boolean ordered = destination.getHttpClient().isStrictEventOrdering();

View File

@ -63,10 +63,14 @@ public class HttpChannelOverHTTP extends HttpChannel
@Override
public boolean abort(Throwable cause)
{
// We want the return value to be that of the response
// because if the response has already successfully
// arrived then we failed to abort the exchange
sender.abort(cause);
boolean sendAborted = sender.abort(cause);
boolean receiveAborted = abortResponse(cause);
return sendAborted || receiveAborted;
}
@Override
public boolean abortResponse(Throwable cause)
{
return receiver.abort(cause);
}

View File

@ -50,10 +50,15 @@ public class HttpClientTransportOverHTTP extends AbstractHttpClientTransport
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination);
HttpConnectionOverHTTP connection = newHttpConnection(endPoint, destination);
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
promise.succeeded(connection);
return connection;
}
protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination)
{
return new HttpConnectionOverHTTP(endPoint, destination);
}
}

View File

@ -297,9 +297,12 @@ public class DeferredContentProvider implements AsyncContentProvider, Callback,
{
failure = x;
// Transfer all chunks to fail them all.
Chunk chunk = current;
current = null;
if (chunk != null)
chunks.add(chunk);
chunks.addAll(DeferredContentProvider.this.chunks);
clear();
current = null;
lock.notify();
}
for (Chunk chunk : chunks)

View File

@ -40,13 +40,10 @@ import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.StdErrLog;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.Assert;
import org.junit.Test;
@ -425,35 +422,25 @@ public class HttpClientContinueTest extends AbstractHttpClientServerTest
}
});
try
byte[] content = new byte[1024];
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString())
.content(new BytesContentProvider(content))
.send(new BufferingResponseListener()
{
Log.getLogger(HttpChannel.class).info("Expecting Close warning...");
((StdErrLog)Log.getLogger(HttpChannel.class)).setHideStacks(true);
byte[] content = new byte[1024];
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString())
.content(new BytesContentProvider(content))
.send(new BufferingResponseListener()
@Override
public void onComplete(Result result)
{
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isFailed());
Assert.assertNotNull(result.getRequestFailure());
Assert.assertNotNull(result.getResponseFailure());
latch.countDown();
}
});
Assert.assertTrue(result.isFailed());
Assert.assertNotNull(result.getRequestFailure());
Assert.assertNotNull(result.getResponseFailure());
latch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
finally
{
((StdErrLog)Log.getLogger(HttpChannel.class)).setHideStacks(false);
}
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Slow

View File

@ -0,0 +1,270 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
public class HttpClientFailureTest
{
private Server server;
private ServerConnector connector;
private HttpClient client;
private void startServer(Handler handler) throws Exception
{
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
connector = new ServerConnector(server);
server.addConnector(connector);
server.setHandler(handler);
server.start();
}
@After
public void dispose() throws Exception
{
if (server != null)
server.stop();
if (client != null)
client.stop();
}
@Test
public void testFailureBeforeRequestCommit() throws Exception
{
startServer(new EmptyServerHandler());
final AtomicReference<HttpConnectionOverHTTP> connectionRef = new AtomicReference<>();
client = new HttpClient(new HttpClientTransportOverHTTP()
{
@Override
protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination)
{
HttpConnectionOverHTTP connection = super.newHttpConnection(endPoint, destination);
connectionRef.set(connection);
return connection;
}
}, null);
client.start();
try
{
client.newRequest("localhost", connector.getLocalPort())
.onRequestHeaders(new Request.HeadersListener()
{
@Override
public void onHeaders(Request request)
{
connectionRef.get().getEndPoint().close();
}
})
.timeout(5, TimeUnit.SECONDS)
.send();
Assert.fail();
}
catch (ExecutionException x)
{
// Expected.
}
ConnectionPool connectionPool = connectionRef.get().getHttpDestination().getConnectionPool();
Assert.assertEquals(0, connectionPool.getConnectionCount());
Assert.assertEquals(0, connectionPool.getActiveConnections().size());
Assert.assertEquals(0, connectionPool.getIdleConnections().size());
}
@Test
public void testFailureAfterRequestCommit() throws Exception
{
startServer(new EmptyServerHandler());
final AtomicReference<HttpConnectionOverHTTP> connectionRef = new AtomicReference<>();
client = new HttpClient(new HttpClientTransportOverHTTP()
{
@Override
protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination)
{
HttpConnectionOverHTTP connection = super.newHttpConnection(endPoint, destination);
connectionRef.set(connection);
return connection;
}
}, null);
client.start();
final CountDownLatch commitLatch = new CountDownLatch(1);
final CountDownLatch completeLatch = new CountDownLatch(1);
DeferredContentProvider content = new DeferredContentProvider();
client.newRequest("localhost", connector.getLocalPort())
.onRequestCommit(new Request.CommitListener()
{
@Override
public void onCommit(Request request)
{
connectionRef.get().getEndPoint().close();
commitLatch.countDown();
}
})
.content(content)
.idleTimeout(2, TimeUnit.SECONDS)
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
if (result.isFailed())
completeLatch.countDown();
}
});
Assert.assertTrue(commitLatch.await(5, TimeUnit.SECONDS));
final CountDownLatch contentLatch = new CountDownLatch(1);
content.offer(ByteBuffer.allocate(1024), new Callback.Adapter()
{
@Override
public void failed(Throwable x)
{
contentLatch.countDown();
}
});
Assert.assertTrue(commitLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(contentLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
ConnectionPool connectionPool = connectionRef.get().getHttpDestination().getConnectionPool();
Assert.assertEquals(0, connectionPool.getConnectionCount());
Assert.assertEquals(0, connectionPool.getActiveConnections().size());
Assert.assertEquals(0, connectionPool.getIdleConnections().size());
}
/*
@Test
public void test_ExchangeIsComplete_WhenRequestFailsMidway_WithResponse() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
// Echo back
IO.copy(request.getInputStream(), response.getOutputStream());
}
});
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
// The second ByteBuffer set to null will throw an exception
.content(new ContentProvider()
{
@Override
public long getLength()
{
return -1;
}
@Override
public Iterator<ByteBuffer> iterator()
{
return new Iterator<ByteBuffer>()
{
@Override
public boolean hasNext()
{
return true;
}
@Override
public ByteBuffer next()
{
throw new NoSuchElementException("explicitly_thrown_by_test");
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
};
}
})
.send(new Response.Listener.Adapter()
{
@Override
public void onComplete(Result result)
{
latch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void test_ExchangeIsComplete_WhenRequestFails_WithNoResponse() throws Exception
{
start(new EmptyServerHandler());
final CountDownLatch latch = new CountDownLatch(1);
final String host = "localhost";
final int port = connector.getLocalPort();
client.newRequest(host, port)
.scheme(scheme)
.onRequestBegin(new Request.BeginListener()
{
@Override
public void onBegin(Request request)
{
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port);
destination.getConnectionPool().getActiveConnections().peek().close();
}
})
.send(new Response.Listener.Adapter()
{
@Override
public void onComplete(Result result)
{
latch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
*/
}

View File

@ -456,7 +456,6 @@ public class HttpClientTest extends AbstractHttpClientServerTest
Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS));
}
@Slow
@Test
public void test_QueuedRequest_IsSent_WhenPreviousRequestClosedConnection() throws Exception
{
@ -518,7 +517,6 @@ public class HttpClientTest extends AbstractHttpClientServerTest
Assert.assertTrue(latch.await(5 * idleTimeout, TimeUnit.MILLISECONDS));
}
@Slow
@Test
public void test_ExchangeIsComplete_OnlyWhenBothRequestAndResponseAreComplete() throws Exception
{

View File

@ -32,9 +32,9 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.http.HttpDestinationOverHTTP;
import org.eclipse.jetty.client.util.ByteBufferContentProvider;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.Assert;
@ -86,9 +86,19 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
Assert.assertSame(cause, x.getCause());
Assert.assertFalse(begin.get());
}
// The request send triggered a connection creation
// that is not awaited before failing the exchange.
Thread.sleep(1000);
// However, the connection has not been used, so it's a good one.
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, "localhost", connector.getLocalPort());
ConnectionPool connectionPool = destination.getConnectionPool();
Assert.assertEquals(1, connectionPool.getConnectionCount());
Assert.assertEquals(0, connectionPool.getActiveConnections().size());
Assert.assertEquals(1, connectionPool.getIdleConnections().size());
}
@Slow
@Test
public void testAbortOnBegin() throws Exception
{
@ -128,9 +138,14 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
Assert.assertSame(cause, x.getCause());
Assert.assertFalse(committed.await(1, TimeUnit.SECONDS));
}
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, "localhost", connector.getLocalPort());
ConnectionPool connectionPool = destination.getConnectionPool();
Assert.assertEquals(0, connectionPool.getConnectionCount());
Assert.assertEquals(0, connectionPool.getActiveConnections().size());
Assert.assertEquals(0, connectionPool.getIdleConnections().size());
}
@Slow
@Test
public void testAbortOnHeaders() throws Exception
{
@ -170,6 +185,12 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
Assert.assertSame(cause, x.getCause());
Assert.assertFalse(committed.await(1, TimeUnit.SECONDS));
}
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, "localhost", connector.getLocalPort());
ConnectionPool connectionPool = destination.getConnectionPool();
Assert.assertEquals(0, connectionPool.getConnectionCount());
Assert.assertEquals(0, connectionPool.getActiveConnections().size());
Assert.assertEquals(0, connectionPool.getIdleConnections().size());
}
@Test
@ -207,6 +228,12 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
if (aborted.get())
Assert.assertSame(cause, x.getCause());
}
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, "localhost", connector.getLocalPort());
ConnectionPool connectionPool = destination.getConnectionPool();
Assert.assertEquals(0, connectionPool.getConnectionCount());
Assert.assertEquals(0, connectionPool.getActiveConnections().size());
Assert.assertEquals(0, connectionPool.getIdleConnections().size());
}
@Test
@ -265,6 +292,12 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
if (aborted.get())
Assert.assertSame(cause, x.getCause());
}
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, "localhost", connector.getLocalPort());
ConnectionPool connectionPool = destination.getConnectionPool();
Assert.assertEquals(0, connectionPool.getConnectionCount());
Assert.assertEquals(0, connectionPool.getActiveConnections().size());
Assert.assertEquals(0, connectionPool.getIdleConnections().size());
}
@Test
@ -314,9 +347,14 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
if (aborted.get())
Assert.assertSame(cause, x.getCause());
}
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, "localhost", connector.getLocalPort());
ConnectionPool connectionPool = destination.getConnectionPool();
Assert.assertEquals(0, connectionPool.getConnectionCount());
Assert.assertEquals(0, connectionPool.getActiveConnections().size());
Assert.assertEquals(0, connectionPool.getIdleConnections().size());
}
@Slow
@Test(expected = InterruptedException.class)
public void testInterrupt() throws Exception
{
@ -363,7 +401,6 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
request.send();
}
@Slow
@Test
public void testAbortLongPoll() throws Exception
{
@ -420,9 +457,14 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
if (aborted.get())
Assert.assertSame(cause, x.getCause());
}
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, "localhost", connector.getLocalPort());
ConnectionPool connectionPool = destination.getConnectionPool();
Assert.assertEquals(0, connectionPool.getConnectionCount());
Assert.assertEquals(0, connectionPool.getActiveConnections().size());
Assert.assertEquals(0, connectionPool.getIdleConnections().size());
}
@Slow
@Test
public void testAbortLongPollAsync() throws Exception
{

View File

@ -78,7 +78,14 @@ public class HttpChannelOverFCGI extends HttpChannel
@Override
public boolean abort(Throwable cause)
{
sender.abort(cause);
boolean sendAborted = sender.abort(cause);
boolean receiveAborted = abortResponse(cause);
return sendAborted || receiveAborted;
}
@Override
public boolean abortResponse(Throwable cause)
{
return receiver.abort(cause);
}

View File

@ -69,8 +69,6 @@ public class HttpChannelOverHTTP2 extends HttpChannel
{
boolean sendAborted = sender.abort(cause);
boolean receiveAborted = receiver.abort(cause);
// Variables cannot be inlined, otherwise abort
// calls may not be executed due to short-circuit.
return sendAborted || receiveAborted;
}