487354 - Aborted request or response does not send RST_STREAM frame.

Fixed by sending a RST_STREAM frame when the channel is aborted.
This commit is contained in:
Simone Bordet 2016-02-05 16:53:52 +01:00
parent 460c778ca1
commit 98cd85287c
5 changed files with 159 additions and 20 deletions

View File

@ -24,8 +24,11 @@ import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpReceiver;
import org.eclipse.jetty.client.HttpSender;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.util.Callback;
public class HttpChannelOverHTTP2 extends HttpChannel
{
@ -33,6 +36,7 @@ public class HttpChannelOverHTTP2 extends HttpChannel
private final Session session;
private final HttpSenderOverHTTP2 sender;
private final HttpReceiverOverHTTP2 receiver;
private Stream stream;
public HttpChannelOverHTTP2(HttpDestination destination, HttpConnectionOverHTTP2 connection, Session session)
{
@ -65,6 +69,16 @@ public class HttpChannelOverHTTP2 extends HttpChannel
return receiver;
}
public Stream getStream()
{
return stream;
}
public void setStream(Stream stream)
{
this.stream = stream;
}
@Override
public void send()
{
@ -79,6 +93,19 @@ public class HttpChannelOverHTTP2 extends HttpChannel
connection.release(this);
}
@Override
public boolean abort(HttpExchange exchange, Throwable requestFailure, Throwable responseFailure)
{
boolean aborted = super.abort(exchange, requestFailure, responseFailure);
if (aborted)
{
Stream stream = getStream();
if (stream != null)
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
}
return aborted;
}
@Override
public void exchangeTerminated(HttpExchange exchange, Result result)
{

View File

@ -33,8 +33,6 @@ import org.eclipse.jetty.util.Promise;
public class HttpSenderOverHTTP2 extends HttpSender
{
private Stream stream;
public HttpSenderOverHTTP2(HttpChannelOverHTTP2 channel)
{
super(channel);
@ -59,7 +57,7 @@ public class HttpSenderOverHTTP2 extends HttpSender
@Override
public void succeeded(Stream stream)
{
HttpSenderOverHTTP2.this.stream = stream;
getHttpChannel().setStream(stream);
stream.setIdleTimeout(request.getIdleTimeout());
if (content.hasContent() && !expects100Continue(request))
@ -95,15 +93,9 @@ public class HttpSenderOverHTTP2 extends HttpSender
}
else
{
Stream stream = getHttpChannel().getStream();
DataFrame frame = new DataFrame(stream.getId(), content.getByteBuffer(), content.isLast());
stream.data(frame, callback);
}
}
@Override
protected void reset()
{
super.reset();
stream = null;
}
}

View File

@ -19,8 +19,11 @@
package org.eclipse.jetty.http2.client.http;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Server;
@ -38,25 +41,38 @@ public class AbstractTest
protected ServerConnector connector;
protected HttpClient client;
protected void start(int maxConcurrentStreams, Handler handler) throws Exception
protected void start(ServerSessionListener listener) throws Exception
{
prepareServer(new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), listener));
server.start();
prepareClient();
client.start();
}
protected void start(Handler handler) throws Exception
{
prepareServer(new HTTP2ServerConnectionFactory(new HttpConfiguration()));
server.setHandler(handler);
server.start();
prepareClient();
client.start();
}
protected void prepareServer(ConnectionFactory connectionFactory)
{
QueuedThreadPool serverExecutor = new QueuedThreadPool();
serverExecutor.setName("server");
server = new Server(serverExecutor);
HTTP2ServerConnectionFactory http2 = new HTTP2ServerConnectionFactory(new HttpConfiguration());
http2.setMaxConcurrentStreams(maxConcurrentStreams);
connector = new ServerConnector(server, 1, 1, http2);
connector = new ServerConnector(server, 1, 1, connectionFactory);
server.addConnector(connector);
}
server.setHandler(handler);
server.start();
protected void prepareClient() throws Exception
{
client = new HttpClient(new HttpClientTransportOverHTTP2(new HTTP2Client()), null);
QueuedThreadPool clientExecutor = new QueuedThreadPool();
clientExecutor.setName("client");
client.setExecutor(clientExecutor);
client.start();
}
@After

View File

@ -18,19 +18,32 @@
package org.eclipse.jetty.http2.client.http;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
public class HttpClientTransportOverHTTP2Test
public class HttpClientTransportOverHTTP2Test extends AbstractTest
{
@Test
public void testPropertiesAreForwarded() throws Exception
@ -56,6 +69,83 @@ public class HttpClientTransportOverHTTP2Test
Assert.assertTrue(http2Client.isStopped());
}
@Test
public void testRequestAbortSendsResetFrame() throws Exception
{
CountDownLatch resetLatch = new CountDownLatch(1);
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
return new Stream.Listener.Adapter()
{
@Override
public void onReset(Stream stream, ResetFrame frame)
{
resetLatch.countDown();
}
};
}
});
try
{
client.newRequest("localhost", connector.getLocalPort())
.onRequestCommit(request -> request.abort(new Exception("explicitly_aborted_by_test")))
.send();
Assert.fail();
}
catch (ExecutionException x)
{
Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
}
}
@Test
public void testResponseAbortSendsResetFrame() throws Exception
{
CountDownLatch resetLatch = new CountDownLatch(1);
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), metaData, null, false), new Callback()
{
@Override
public void succeeded()
{
ByteBuffer data = ByteBuffer.allocate(1024);
stream.data(new DataFrame(stream.getId(), data, false), NOOP);
}
});
return new Stream.Listener.Adapter()
{
@Override
public void onReset(Stream stream, ResetFrame frame)
{
resetLatch.countDown();
}
};
}
});
try
{
client.newRequest("localhost", connector.getLocalPort())
.onResponseContent((response, buffer) -> response.abort(new Exception("explicitly_aborted_by_test")))
.send();
Assert.fail();
}
catch (ExecutionException x)
{
Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
}
}
@Ignore
@Test
public void testExternalServer() throws Exception

View File

@ -28,6 +28,9 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.Assert;
@ -35,6 +38,17 @@ import org.junit.Test;
public class MaxConcurrentStreamsTest extends AbstractTest
{
private void start(int maxConcurrentStreams, Handler handler) throws Exception
{
HTTP2ServerConnectionFactory http2 = new HTTP2ServerConnectionFactory(new HttpConfiguration());
http2.setMaxConcurrentStreams(maxConcurrentStreams);
prepareServer(http2);
server.setHandler(handler);
server.start();
prepareClient();
client.start();
}
@Test
public void testOneConcurrentStream() throws Exception
{