432145 - Pending request is not failed when HttpClient is stopped.

Fixed by fixing the code in close() to also abort pending exchanges.
Reviewed for HTTP, FastCGI and SPDY transports.
This commit is contained in:
Simone Bordet 2014-04-07 17:31:25 +02:00
parent 0e458c80f4
commit e3662a9b23
11 changed files with 206 additions and 32 deletions

View File

@ -129,6 +129,15 @@ public abstract class MultiplexHttpDestination<C extends Connection> extends Htt
return true;
}
@Override
public void close()
{
super.close();
C connection = this.connection;
if (connection != null)
connection.close();
}
@Override
public void close(Connection connection)
{

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.client.http;
import java.nio.channels.AsynchronousCloseException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@ -85,13 +86,8 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
protected boolean onReadTimeout()
{
LOG.debug("{} idle timeout", this);
HttpExchange exchange = channel.getHttpExchange();
if (exchange != null)
return exchange.getRequest().abort(new TimeoutException());
getHttpDestination().close(this);
return true;
close(new TimeoutException());
return false;
}
@Override
@ -119,14 +115,23 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
@Override
public void close()
{
close(new AsynchronousCloseException());
}
protected void close(Throwable failure)
{
if (softClose())
{
// First close then abort, to be sure that the connection cannot be reused
// from an onFailure() handler or by blocking code waiting for completion.
getHttpDestination().close(this);
getEndPoint().shutdownOutput();
LOG.debug("{} oshut", this);
getEndPoint().close();
LOG.debug("{} closed", this);
abort(failure);
}
}
@ -135,6 +140,12 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
return closed.compareAndSet(false, true);
}
private boolean abort(Throwable failure)
{
HttpExchange exchange = channel.getHttpExchange();
return exchange != null && exchange.getRequest().abort(failure);
}
@Override
public String toString()
{

View File

@ -127,8 +127,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
// Shutting down the parser may invoke messageComplete() or earlyEOF()
parser.atEOF();
parser.parseNext(BufferUtil.EMPTY_BUFFER);
if (!responseFailure(new EOFException()))
getHttpConnection().close();
getHttpConnection().close(new EOFException());
}
@Override

View File

@ -1161,4 +1161,40 @@ public class HttpClientTest extends AbstractHttpClientServerTest
Assert.assertEquals(200, response.getStatus());
Assert.assertTrue(response.getHeaders().contains(HttpHeader.CONNECTION, HttpHeaderValue.KEEP_ALIVE.asString()));
}
@Test
public void testLongPollIsAbortedWhenClientIsStopped() throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
request.startAsync();
latch.countDown();
}
});
final CountDownLatch completeLatch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
if (result.isFailed())
completeLatch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
// Stop the client, the complete listener must be invoked.
client.stop();
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -132,7 +132,7 @@ public class HttpChannelOverFCGI extends HttpChannel
if (close)
connection.close();
else
connection.release();
connection.release(this);
}
protected void flush(Generator.Result... results)
@ -155,7 +155,7 @@ public class HttpChannelOverFCGI extends HttpChannel
protected void onIdleExpired(TimeoutException timeout)
{
LOG.debug("Idle timeout for request {}", request);
abort(timeout);
connection.abort(timeout);
}
@Override

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.fcgi.client.http;
import java.io.EOFException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -139,25 +140,19 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
private void shutdown()
{
// First close then abort, to be sure that the
// connection cannot be reused from an onFailure()
// handler or by blocking code waiting for completion.
close();
for (HttpChannelOverFCGI channel : channels.values())
channel.abort(new EOFException());
close(new EOFException());
}
@Override
protected boolean onReadTimeout()
{
for (HttpChannelOverFCGI channel : channels.values())
channel.abort(new TimeoutException());
close();
close(new TimeoutException());
return false;
}
public void release()
protected void release(HttpChannelOverFCGI channel)
{
channels.remove(channel.getRequest());
if (destination instanceof PoolingHttpDestination)
{
@SuppressWarnings("unchecked")
@ -169,17 +164,37 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
@Override
public void close()
{
close(new AsynchronousCloseException());
}
private void close(Throwable failure)
{
if (closed.compareAndSet(false, true))
{
// First close then abort, to be sure that the connection cannot be reused
// from an onFailure() handler or by blocking code waiting for completion.
getHttpDestination().close(this);
getEndPoint().shutdownOutput();
LOG.debug("{} oshut", this);
getEndPoint().close();
LOG.debug("{} closed", this);
abort(failure);
}
}
protected void abort(Throwable failure)
{
for (HttpChannelOverFCGI channel : channels.values())
{
HttpExchange exchange = channel.getHttpExchange();
if (exchange != null)
exchange.getRequest().abort(failure);
}
channels.clear();
}
private int acquireRequest()
{
synchronized (requests)
@ -304,7 +319,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
@Override
public void onEnd(int request)
{
HttpChannelOverFCGI channel = channels.remove(request);
HttpChannelOverFCGI channel = channels.get(request);
if (channel != null)
{
channel.responseSuccess();

View File

@ -24,6 +24,7 @@ import java.net.URI;
import java.net.URLEncoder;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -37,6 +38,7 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.server.handler.AbstractHandler;
@ -513,4 +515,40 @@ public class HttpClientTest extends AbstractHttpClientServerTest
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(length, response.getContent().length);
}
@Test
public void testLongPollIsAbortedWhenClientIsStopped() throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
request.startAsync();
latch.countDown();
}
});
final CountDownLatch completeLatch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
if (result.isFailed())
completeLatch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
// Stop the client, the complete listener must be invoked.
client.stop();
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -21,17 +21,20 @@ package org.eclipse.jetty.spdy.client.http;
import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.spdy.api.Session;
public class HttpChannelOverSPDY extends HttpChannel
{
private final HttpConnectionOverSPDY connection;
private final Session session;
private final HttpSenderOverSPDY sender;
private final HttpReceiverOverSPDY receiver;
public HttpChannelOverSPDY(HttpDestination destination, Session session)
public HttpChannelOverSPDY(HttpDestination destination, HttpConnectionOverSPDY connection, Session session)
{
super(destination);
this.connection = connection;
this.session = session;
this.sender = new HttpSenderOverSPDY(this);
this.receiver = new HttpReceiverOverSPDY(this);
@ -72,4 +75,11 @@ public class HttpChannelOverSPDY extends HttpChannel
sender.abort(cause);
return receiver.abort(cause);
}
@Override
public void exchangeTerminated(Result result)
{
super.exchangeTerminated(result);
connection.release(this);
}
}

View File

@ -18,6 +18,9 @@
package org.eclipse.jetty.spdy.client.http;
import java.nio.channels.AsynchronousCloseException;
import java.util.Set;
import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
@ -25,9 +28,11 @@ import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ConcurrentHashSet;
public class HttpConnectionOverSPDY extends HttpConnection
{
private final Set<HttpChannel> channels = new ConcurrentHashSet<>();
private final Session session;
public HttpConnectionOverSPDY(HttpDestination destination, Session session)
@ -41,14 +46,35 @@ public class HttpConnectionOverSPDY extends HttpConnection
{
normalizeRequest(exchange.getRequest());
// One connection maps to N channels, so for each exchange we create a new channel
HttpChannel channel = new HttpChannelOverSPDY(getHttpDestination(), session);
HttpChannel channel = new HttpChannelOverSPDY(getHttpDestination(), this, session);
channels.add(channel);
channel.associate(exchange);
channel.send();
}
protected void release(HttpChannel channel)
{
channels.remove(channel);
}
@Override
public void close()
{
// First close then abort, to be sure that the connection cannot be reused
// from an onFailure() handler or by blocking code waiting for completion.
getHttpDestination().close(this);
session.goAway(new GoAwayInfo(), new Callback.Adapter());
abort(new AsynchronousCloseException());
}
private void abort(Throwable failure)
{
for (HttpChannel channel : channels)
{
HttpExchange exchange = channel.getHttpExchange();
if (exchange != null)
exchange.getRequest().abort(failure);
}
channels.clear();
}
}

View File

@ -35,12 +35,4 @@ public class HttpDestinationOverSPDY extends MultiplexHttpDestination<HttpConnec
{
connection.send(exchange);
}
@Override
public void abort(Throwable cause)
{
// TODO: in case of connection failure, we need to abort also
// TODO: all pending exchanges, so we need to track them.
super.abort(cause);
}
}

View File

@ -23,6 +23,7 @@ import java.net.URI;
import java.net.URLEncoder;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -36,6 +37,7 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.server.handler.AbstractHandler;
@ -426,4 +428,40 @@ public class HttpClientTest extends AbstractHttpClientServerTest
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(length, response.getContent().length);
}
@Test
public void testLongPollIsAbortedWhenClientIsStopped() throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
request.startAsync();
latch.countDown();
}
});
final CountDownLatch completeLatch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
if (result.isFailed())
completeLatch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
// Stop the client, the complete listener must be invoked.
client.stop();
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
}
}