jetty-9 - HTTP client: improved HttpExchange.complete*() to return a Result that remembers

request failures and response failures.
Also improved many tests to avoid random failures.
This commit is contained in:
Simone Bordet 2012-09-14 16:16:14 +02:00
parent 8c9f097666
commit e80430fbc2
11 changed files with 91 additions and 67 deletions

View File

@ -517,6 +517,7 @@ public class HttpClient extends AggregateLifeCycle
EndPoint appEndPoint = sslConnection.getDecryptedEndPoint(); EndPoint appEndPoint = sslConnection.getDecryptedEndPoint();
HttpConnection connection = new HttpConnection(HttpClient.this, appEndPoint, destination); HttpConnection connection = new HttpConnection(HttpClient.this, appEndPoint, destination);
appEndPoint.setConnection(connection); appEndPoint.setConnection(connection);
connectionOpened(connection);
callback.callback.completed(connection); callback.callback.completed(connection);
return sslConnection; return sslConnection;

View File

@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
@ -35,6 +36,8 @@ public class HttpExchange
private final Request request; private final Request request;
private final Response.Listener listener; private final Response.Listener listener;
private final HttpResponse response; private final HttpResponse response;
private volatile Throwable requestFailure;
private volatile Throwable responseFailure;
public HttpExchange(HttpConversation conversation, HttpConnection connection, Request request, Response.Listener listener) public HttpExchange(HttpConversation conversation, HttpConnection connection, Request request, Response.Listener listener)
{ {
@ -70,18 +73,20 @@ public class HttpExchange
connection.receive(); connection.receive();
} }
public boolean requestComplete(boolean success) public Result requestComplete(Throwable failure)
{ {
this.requestFailure = failure;
int requestSuccess = 0b0011; int requestSuccess = 0b0011;
int requestFailure = 0b0001; int requestFailure = 0b0001;
return complete(success ? requestSuccess : requestFailure); return complete(failure == null ? requestSuccess : requestFailure);
} }
public boolean responseComplete(boolean success) public Result responseComplete(Throwable failure)
{ {
this.responseFailure = failure;
int responseSuccess = 0b1100; int responseSuccess = 0b1100;
int responseFailure = 0b0100; int responseFailure = 0b0100;
return complete(success ? responseSuccess : responseFailure); return complete(failure == null ? responseSuccess : responseFailure);
} }
/** /**
@ -98,23 +103,23 @@ public class HttpExchange
* whether the exchange is completed and whether is successful. * whether the exchange is completed and whether is successful.
* *
* @param code the bits representing the status code for either the request or the response * @param code the bits representing the status code for either the request or the response
* @return whether the exchange completed (either successfully or not) * @return the result if the exchange completed, or null if the exchange did not complete
*/ */
private boolean complete(int code) private Result complete(int code)
{ {
int status = complete.addAndGet(code); int status = complete.addAndGet(code);
int completed = 0b0101; int completed = 0b0101;
if ((status & completed) == completed) if ((status & completed) == completed)
{ {
LOG.debug("{} complete", this); boolean success = status == 0b1111;
LOG.debug("{} complete success={}", this);
// Request and response completed // Request and response completed
if (this == conversation.last()) if (this == conversation.last())
conversation.complete(); conversation.complete();
int success = 0b1111; connection.complete(this, success);
connection.complete(this, status == success); return new Result(request, requestFailure, response, responseFailure);
return true;
} }
return false; return null;
} }
public void abort() public void abort()

View File

@ -44,6 +44,7 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
private final HttpParser parser = new HttpParser(this); private final HttpParser parser = new HttpParser(this);
private final ResponseNotifier notifier = new ResponseNotifier(); private final ResponseNotifier notifier = new ResponseNotifier();
private final HttpConnection connection; private final HttpConnection connection;
private volatile boolean success;
private volatile boolean failed; private volatile boolean failed;
public HttpReceiver(HttpConnection connection) public HttpReceiver(HttpConnection connection)
@ -74,7 +75,10 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
} }
else else
{ {
// Shutting down the parser may invoke messageComplete()
parser.shutdownInput(); parser.shutdownInput();
if (!success)
fail(new EOFException());
break; break;
} }
} }
@ -193,47 +197,42 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
protected void success() protected void success()
{ {
parser.reset(); parser.reset();
success = true;
HttpExchange exchange = connection.getExchange(); HttpExchange exchange = connection.getExchange();
HttpResponse response = exchange.response(); HttpResponse response = exchange.response();
LOG.debug("Received {}", response); LOG.debug("Received {}", response);
boolean exchangeComplete = exchange.responseComplete(true); Result result = exchange.responseComplete(null);
HttpConversation conversation = exchange.conversation(); HttpConversation conversation = exchange.conversation();
notifier.notifySuccess(conversation.listener(), response); notifier.notifySuccess(conversation.listener(), response);
if (exchangeComplete) if (result != null)
{
Result result = new Result(exchange.request(), response);
notifier.notifyComplete(conversation.listener(), result); notifier.notifyComplete(conversation.listener(), result);
}
} }
protected void fail(Throwable failure) protected void fail(Throwable failure)
{ {
parser.close();
failed = true;
HttpExchange exchange = connection.getExchange(); HttpExchange exchange = connection.getExchange();
// In case of a response error, the failure has already been notified // In case of a response error, the failure has already been notified
// and it is possible that a further attempt to read in the receive // and it is possible that a further attempt to read in the receive
// loop throws an exception that reenters here but without exchange // loop throws an exception that reenters here but without exchange;
// or, the server could just have timed out the connection.
if (exchange == null) if (exchange == null)
return; return;
parser.close();
failed = true;
HttpResponse response = exchange.response(); HttpResponse response = exchange.response();
LOG.debug("Failed {} {}", response, failure); LOG.debug("Failed {} {}", response, failure);
boolean exchangeComplete = exchange.responseComplete(false); Result result = exchange.responseComplete(failure);
HttpConversation conversation = exchange.conversation(); HttpConversation conversation = exchange.conversation();
notifier.notifyFailure(conversation.listener(), response, failure); notifier.notifyFailure(conversation.listener(), response, failure);
if (exchangeComplete) if (result != null)
{
Result result = new Result(exchange.request(), response, failure);
notifier.notifyComplete(conversation.listener(), result); notifier.notifyComplete(conversation.listener(), result);
}
} }
@Override @Override

View File

@ -223,15 +223,14 @@ public class HttpSender
Request request = exchange.request(); Request request = exchange.request();
LOG.debug("Sent {}", request); LOG.debug("Sent {}", request);
boolean exchangeCompleted = exchange.requestComplete(true); Result result = exchange.requestComplete(null);
// It is important to notify *after* we reset because // It is important to notify *after* we reset because
// the notification may trigger another request/response // the notification may trigger another request/response
requestNotifier.notifySuccess(request); requestNotifier.notifySuccess(request);
if (exchangeCompleted) if (result != null)
{ {
HttpConversation conversation = exchange.conversation(); HttpConversation conversation = exchange.conversation();
Result result = new Result(request, exchange.response());
responseNotifier.notifyComplete(conversation.listener(), result); responseNotifier.notifyComplete(conversation.listener(), result);
} }
} }
@ -250,15 +249,20 @@ public class HttpSender
Request request = exchange.request(); Request request = exchange.request();
LOG.debug("Failed {} {}", request, failure); LOG.debug("Failed {} {}", request, failure);
boolean exchangeCompleted = exchange.requestComplete(false); Result result = exchange.requestComplete(failure);
if (!exchangeCompleted && !committed) if (result == null && !committed)
exchangeCompleted = exchange.responseComplete(false); result = exchange.responseComplete(null);
// If the exchange is not completed, we need to shutdown the output
// to signal to the server that we're done (otherwise it may be
// waiting for more data that will not arrive)
if (result == null)
connection.getEndPoint().shutdownOutput();
requestNotifier.notifyFailure(request, failure); requestNotifier.notifyFailure(request, failure);
if (exchangeCompleted) if (result != null)
{ {
HttpConversation conversation = exchange.conversation(); HttpConversation conversation = exchange.conversation();
Result result = new Result(request, failure, exchange.response());
responseNotifier.notifyComplete(conversation.listener(), result); responseNotifier.notifyComplete(conversation.listener(), result);
} }
} }

View File

@ -44,7 +44,7 @@ public class Result
this(request, requestFailure, response, null); this(request, requestFailure, response, null);
} }
private Result(Request request, Throwable requestFailure, Response response, Throwable responseFailure) public Result(Request request, Throwable requestFailure, Response response, Throwable responseFailure)
{ {
this.request = request; this.request = request;
this.requestFailure = requestFailure; this.requestFailure = requestFailure;

View File

@ -70,7 +70,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
} }
}) })
.send() .send()
.get(5, TimeUnit.SECONDS); .get(10, TimeUnit.SECONDS);
long responseTime = System.nanoTime(); long responseTime = System.nanoTime();
Assert.assertEquals(200, response.status()); Assert.assertEquals(200, response.status());

View File

@ -39,10 +39,10 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentProvider; import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Destination; import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpCookie; import org.eclipse.jetty.http.HttpCookie;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils; import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.toolchain.test.annotation.Slow; import org.eclipse.jetty.toolchain.test.annotation.Slow;
@ -117,7 +117,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
{ {
start(new EmptyServerHandler()); start(new EmptyServerHandler());
Response response = client.GET(scheme + "://localhost:" + connector.getLocalPort()).get(555, TimeUnit.SECONDS); Response response = client.GET(scheme + "://localhost:" + connector.getLocalPort()).get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response); Assert.assertNotNull(response);
Assert.assertEquals(200, response.status()); Assert.assertEquals(200, response.status());
@ -130,7 +130,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
start(new AbstractHandler() start(new AbstractHandler()
{ {
@Override @Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{ {
response.getOutputStream().write(data); response.getOutputStream().write(data);
baseRequest.setHandled(true); baseRequest.setHandled(true);
@ -153,7 +153,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
start(new AbstractHandler() start(new AbstractHandler()
{ {
@Override @Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{ {
response.setCharacterEncoding("UTF-8"); response.setCharacterEncoding("UTF-8");
ServletOutputStream output = response.getOutputStream(); ServletOutputStream output = response.getOutputStream();
@ -185,7 +185,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
start(new AbstractHandler() start(new AbstractHandler()
{ {
@Override @Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{ {
response.setCharacterEncoding("UTF-8"); response.setCharacterEncoding("UTF-8");
ServletOutputStream output = response.getOutputStream(); ServletOutputStream output = response.getOutputStream();
@ -224,10 +224,10 @@ public class HttpClientTest extends AbstractHttpClientServerTest
final CountDownLatch successLatch = new CountDownLatch(2); final CountDownLatch successLatch = new CountDownLatch(2);
client.newRequest("localhost", connector.getLocalPort()) client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme) .scheme(scheme)
.listener(new org.eclipse.jetty.client.api.Request.Listener.Empty() .listener(new Request.Listener.Empty()
{ {
@Override @Override
public void onBegin(org.eclipse.jetty.client.api.Request request) public void onBegin(Request request)
{ {
try try
{ {
@ -251,10 +251,10 @@ public class HttpClientTest extends AbstractHttpClientServerTest
client.newRequest("localhost", connector.getLocalPort()) client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme) .scheme(scheme)
.listener(new org.eclipse.jetty.client.api.Request.Listener.Empty() .listener(new Request.Listener.Empty()
{ {
@Override @Override
public void onQueued(org.eclipse.jetty.client.api.Request request) public void onQueued(Request request)
{ {
latch.countDown(); latch.countDown();
} }
@ -285,10 +285,10 @@ public class HttpClientTest extends AbstractHttpClientServerTest
final CountDownLatch latch = new CountDownLatch(3); final CountDownLatch latch = new CountDownLatch(3);
client.newRequest("localhost", connector.getLocalPort()) client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme) .scheme(scheme)
.listener(new org.eclipse.jetty.client.api.Request.Listener.Empty() .listener(new Request.Listener.Empty()
{ {
@Override @Override
public void onBegin(org.eclipse.jetty.client.api.Request request) public void onBegin(Request request)
{ {
try try
{ {
@ -301,7 +301,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
} }
@Override @Override
public void onFailure(org.eclipse.jetty.client.api.Request request, Throwable failure) public void onFailure(Request request, Throwable failure)
{ {
latch.countDown(); latch.countDown();
} }
@ -354,10 +354,10 @@ public class HttpClientTest extends AbstractHttpClientServerTest
client.newRequest("localhost", connector.getLocalPort()) client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme) .scheme(scheme)
.file(file) .file(file)
.listener(new org.eclipse.jetty.client.api.Request.Listener.Empty() .listener(new Request.Listener.Empty()
{ {
@Override @Override
public void onSuccess(org.eclipse.jetty.client.api.Request request) public void onSuccess(Request request)
{ {
requestTime.set(System.nanoTime()); requestTime.set(System.nanoTime());
latch.countDown(); latch.countDown();
@ -395,11 +395,10 @@ public class HttpClientTest extends AbstractHttpClientServerTest
@Test @Test
public void test_ExchangeIsComplete_WhenRequestFailsMidway_WithResponse() throws Exception public void test_ExchangeIsComplete_WhenRequestFailsMidway_WithResponse() throws Exception
{ {
final int chunkSize = 16;
start(new AbstractHandler() start(new AbstractHandler()
{ {
@Override @Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{ {
// Echo back // Echo back
IO.copy(request.getInputStream(), response.getOutputStream()); IO.copy(request.getInputStream(), response.getOutputStream());
@ -465,10 +464,10 @@ public class HttpClientTest extends AbstractHttpClientServerTest
final int port = connector.getLocalPort(); final int port = connector.getLocalPort();
client.newRequest(host, port) client.newRequest(host, port)
.scheme(scheme) .scheme(scheme)
.listener(new org.eclipse.jetty.client.api.Request.Listener.Empty() .listener(new Request.Listener.Empty()
{ {
@Override @Override
public void onBegin(org.eclipse.jetty.client.api.Request request) public void onBegin(Request request)
{ {
HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port); HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port);
destination.getActiveConnections().peek().close(); destination.getActiveConnections().peek().close();

View File

@ -118,7 +118,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
final BlockingQueue<Connection> activeConnections = destination.getActiveConnections(); final BlockingQueue<Connection> activeConnections = destination.getActiveConnections();
Assert.assertEquals(0, activeConnections.size()); Assert.assertEquals(0, activeConnections.size());
final CountDownLatch headersLatch = new CountDownLatch(1); final CountDownLatch beginLatch = new CountDownLatch(1);
final CountDownLatch failureLatch = new CountDownLatch(2); final CountDownLatch failureLatch = new CountDownLatch(2);
client.newRequest(host, port).scheme(scheme).listener(new Request.Listener.Empty() client.newRequest(host, port).scheme(scheme).listener(new Request.Listener.Empty()
{ {
@ -126,7 +126,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
public void onBegin(Request request) public void onBegin(Request request)
{ {
activeConnections.peek().close(); activeConnections.peek().close();
headersLatch.countDown(); beginLatch.countDown();
} }
@Override @Override
@ -146,7 +146,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
} }
}); });
Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(beginLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(failureLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(failureLatch.await(5, TimeUnit.SECONDS));
Assert.assertEquals(0, idleConnections.size()); Assert.assertEquals(0, idleConnections.size());

View File

@ -66,7 +66,7 @@ public class HttpReceiverTest
HttpExchange exchange = new HttpExchange(conversation, connection, null, listener); HttpExchange exchange = new HttpExchange(conversation, connection, null, listener);
conversation.exchanges().offer(exchange); conversation.exchanges().offer(exchange);
connection.setExchange(exchange); connection.setExchange(exchange);
exchange.requestComplete(true); exchange.requestComplete(null);
return exchange; return exchange;
} }

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.client; package org.eclipse.jetty.client;
import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -161,9 +162,11 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
} }
}); });
// Test can behave in 2 ways: // Test can behave in 3 ways:
// A) if the request is failed before the request arrived, then we get an ExecutionException // A) non-SSL, if the request is failed before the response arrived, then we get an ExecutionException
// B) if the request is failed after the request arrived, then we get a 500 // B) non-SSL, if the request is failed after the response arrived, then we get a 500
// C) SSL, the server tries to write the 500, but the connection is already closed, the client
// reads -1 with a pending exchange and fails the response with an EOFException
try try
{ {
ContentResponse response = client.newRequest("localhost", connector.getLocalPort()) ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
@ -190,9 +193,22 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
} }
catch (ExecutionException x) catch (ExecutionException x)
{ {
HttpRequestException xx = (HttpRequestException)x.getCause(); Throwable cause = x.getCause();
Request request = xx.getRequest(); if (cause instanceof EOFException)
Assert.assertNotNull(request); {
// Server closed abruptly, behavior C
}
else if (cause instanceof HttpRequestException)
{
// Request failed, behavior A
HttpRequestException xx = (HttpRequestException)cause;
Request request = xx.getRequest();
Assert.assertNotNull(request);
}
else
{
throw x;
}
} }
} }
} }

View File

@ -141,7 +141,7 @@ public class HttpResponseAbortTest extends AbstractHttpClientServerTest
latch.countDown(); latch.countDown();
} }
}); });
Assert.assertTrue(latch.await(555, TimeUnit.SECONDS)); Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
} }
@Test(expected = CancellationException.class) @Test(expected = CancellationException.class)