442083 - Client resets stream, pending server data is failed, connection closed.

Introduced ResetException, and using it when failing frames of streams
that have been reset already.

HttpTransportOverHTTP2.abort(Throwable) checks for this exception and
does not close the connection.
This commit is contained in:
Simone Bordet 2014-08-19 18:40:01 +02:00
parent 8e62a50500
commit c15480644f
4 changed files with 159 additions and 7 deletions

View File

@ -32,11 +32,13 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCodes;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlets.PushCacheFilter;
import org.eclipse.jetty.util.Callback;
@ -78,7 +80,7 @@ public class PushTest extends AbstractTest
final String primaryURI = "http://localhost:" + connector.getLocalPort() + servletPath + primaryResource;
HttpFields primaryFields = new HttpFields();
MetaData.Request primaryRequest = newRequest("GET", primaryResource, primaryFields);
final CountDownLatch secondaryResponseLatch = new CountDownLatch(1);
final CountDownLatch warmupLatch = new CountDownLatch(1);
session.newStream(new HeadersFrame(0, primaryRequest, null, true), new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{
@Override
@ -96,13 +98,13 @@ public class PushTest extends AbstractTest
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
secondaryResponseLatch.countDown();
warmupLatch.countDown();
}
});
}
}
});
Assert.assertTrue(secondaryResponseLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(warmupLatch.await(5, TimeUnit.SECONDS));
// Request again the primary resource, we should get the secondary resource pushed.
primaryRequest = newRequest("GET", primaryResource, primaryFields);
@ -136,4 +138,107 @@ public class PushTest extends AbstractTest
Assert.assertTrue(pushLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(primaryResponseLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testPushIsReset() throws Exception
{
final String primaryResource = "/primary.html";
final String secondaryResource = "/secondary.png";
final byte[] secondaryData = "SECONDARY".getBytes("UTF-8");
startServer(new HttpServlet()
{
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
{
String requestURI = req.getRequestURI();
ServletOutputStream output = resp.getOutputStream();
if (requestURI.endsWith(primaryResource))
output.print("<html><head></head><body>PRIMARY</body></html>");
else if (requestURI.endsWith(secondaryResource))
output.write(secondaryData);
}
});
final Session session = newClient(new Session.Listener.Adapter());
// Request for the primary and secondary resource to build the cache.
final String primaryURI = "http://localhost:" + connector.getLocalPort() + servletPath + primaryResource;
HttpFields primaryFields = new HttpFields();
MetaData.Request primaryRequest = newRequest("GET", primaryResource, primaryFields);
final CountDownLatch warmupLatch = new CountDownLatch(1);
session.newStream(new HeadersFrame(0, primaryRequest, null, true), new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
callback.succeeded();
if (frame.isEndStream())
{
// Request for the secondary resource.
HttpFields secondaryFields = new HttpFields();
secondaryFields.put(HttpHeader.REFERER, primaryURI);
MetaData.Request secondaryRequest = newRequest("GET", secondaryResource, secondaryFields);
session.newStream(new HeadersFrame(0, secondaryRequest, null, true), new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
warmupLatch.countDown();
}
});
}
}
});
Assert.assertTrue(warmupLatch.await(5, TimeUnit.SECONDS));
// Request again the primary resource, we should get the secondary resource pushed.
primaryRequest = newRequest("GET", primaryResource, primaryFields);
final CountDownLatch primaryResponseLatch = new CountDownLatch(1);
final CountDownLatch pushLatch = new CountDownLatch(1);
session.newStream(new HeadersFrame(0, primaryRequest, null, true), new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{
@Override
public Stream.Listener onPush(Stream stream, PushPromiseFrame frame)
{
// Reset the stream as soon as we see the push.
ResetFrame resetFrame = new ResetFrame(stream.getId(), ErrorCodes.REFUSED_STREAM_ERROR);
stream.reset(resetFrame, Callback.Adapter.INSTANCE);
return new Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
pushLatch.countDown();
}
};
}
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
callback.succeeded();
if (frame.isEndStream())
primaryResponseLatch.countDown();
}
});
// We should not receive pushed data that we reset.
Assert.assertFalse(pushLatch.await(1, TimeUnit.SECONDS));
Assert.assertTrue(primaryResponseLatch.await(5, TimeUnit.SECONDS));
// Make sure the session is sane by requesting the secondary resource.
HttpFields secondaryFields = new HttpFields();
secondaryFields.put(HttpHeader.REFERER, primaryURI);
MetaData.Request secondaryRequest = newRequest("GET", secondaryResource, secondaryFields);
final CountDownLatch secondaryResponseLatch = new CountDownLatch(1);
session.newStream(new HeadersFrame(0, secondaryRequest, null, true), new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
if (frame.isEndStream())
secondaryResponseLatch.countDown();
}
});
Assert.assertTrue(secondaryResponseLatch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -18,7 +18,6 @@
package org.eclipse.jetty.http2;
import java.io.EOFException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
@ -348,7 +347,7 @@ public class HTTP2Flusher extends IteratingCallback
public void reset()
{
failed(new EOFException("reset"));
failed(new ResetException());
}
@Override

View File

@ -0,0 +1,46 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2;
public class ResetException extends RuntimeException
{
public ResetException()
{
}
public ResetException(String message)
{
super(message);
}
public ResetException(String message, Throwable cause)
{
super(message, cause);
}
public ResetException(Throwable cause)
{
super(cause);
}
public ResetException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace)
{
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -26,6 +26,7 @@ import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.ResetException;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
@ -162,11 +163,12 @@ public class HttpTransportOverHTTP2 implements HttpTransport
}
@Override
public void abort()
public void abort(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Response #{} aborted", stream.getId());
stream.getSession().disconnect();
if (!(failure instanceof ResetException))
stream.getSession().disconnect();
}
private class CommitCallback implements Callback