Fixed removal of streams upon sending the response HEADERS frame.

This commit is contained in:
Simone Bordet 2015-02-06 22:48:43 +01:00
parent 7066f65e8c
commit d0f0aa7c9f
10 changed files with 207 additions and 28 deletions

View File

@ -48,7 +48,7 @@ public class AbstractTest
protected HTTP2Client client;
private Server server;
protected void startServer(HttpServlet servlet) throws Exception
protected void start(HttpServlet servlet) throws Exception
{
prepareServer(new HTTP2ServerConnectionFactory(new HttpConfiguration()));
@ -66,7 +66,7 @@ public class AbstractTest
{
}
protected void startServer(ServerSessionListener listener) throws Exception
protected void start(ServerSessionListener listener) throws Exception
{
prepareServer(new RawHTTP2ServerConnectionFactory(new HttpConfiguration(),listener));
prepareClient();

View File

@ -46,7 +46,7 @@ public class HTTP2Test extends AbstractTest
@Test
public void testRequestNoContentResponseNoContent() throws Exception
{
startServer(new EmptyHttpServlet());
start(new EmptyHttpServlet());
Session session = newClient(new Session.Listener.Adapter());
@ -79,7 +79,7 @@ public class HTTP2Test extends AbstractTest
public void testRequestNoContentResponseContent() throws Exception
{
final byte[] content = "Hello World!".getBytes(StandardCharsets.UTF_8);
startServer(new HttpServlet()
start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
@ -131,7 +131,7 @@ public class HTTP2Test extends AbstractTest
public void testMultipleRequests() throws Exception
{
final String downloadBytes = "X-Download";
startServer(new HttpServlet()
start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException

View File

@ -56,7 +56,7 @@ public class IdleTimeoutTest extends AbstractTest
@Test
public void testServerEnforcingIdleTimeout() throws Exception
{
startServer(new ServerSessionListener.Adapter()
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame requestFrame)
@ -100,7 +100,7 @@ public class IdleTimeoutTest extends AbstractTest
@Test
public void testServerEnforcingIdleTimeoutWithUnrespondedStream() throws Exception
{
startServer(new ServerSessionListener.Adapter()
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
@ -140,7 +140,7 @@ public class IdleTimeoutTest extends AbstractTest
@Test
public void testServerNotEnforcingIdleTimeoutWithinCallback() throws Exception
{
startServer(new ServerSessionListener.Adapter()
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
@ -203,7 +203,7 @@ public class IdleTimeoutTest extends AbstractTest
public void testClientEnforcingIdleTimeout() throws Exception
{
final CountDownLatch closeLatch = new CountDownLatch(1);
startServer(new ServerSessionListener.Adapter()
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
@ -244,7 +244,7 @@ public class IdleTimeoutTest extends AbstractTest
public void testClientEnforcingIdleTimeoutWithUnrespondedStream() throws Exception
{
final CountDownLatch closeLatch = new CountDownLatch(1);
startServer(new ServerSessionListener.Adapter()
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
@ -281,7 +281,7 @@ public class IdleTimeoutTest extends AbstractTest
public void testClientNotEnforcingIdleTimeoutWithinCallback() throws Exception
{
final CountDownLatch closeLatch = new CountDownLatch(1);
startServer(new ServerSessionListener.Adapter()
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
@ -339,7 +339,7 @@ public class IdleTimeoutTest extends AbstractTest
public void testClientEnforcingStreamIdleTimeout() throws Exception
{
final int idleTimeout = 1000;
startServer(new HttpServlet()
start(new HttpServlet()
{
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
@ -398,7 +398,7 @@ public class IdleTimeoutTest extends AbstractTest
public void testServerEnforcingStreamIdleTimeout() throws Exception
{
final CountDownLatch timeoutLatch = new CountDownLatch(1);
startServer(new ServerSessionListener.Adapter()
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
@ -442,7 +442,7 @@ public class IdleTimeoutTest extends AbstractTest
public void testStreamIdleTimeoutIsNotEnforcedWhenReceiving() throws Exception
{
final CountDownLatch timeoutLatch = new CountDownLatch(1);
startServer(new ServerSessionListener.Adapter()
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
@ -496,7 +496,7 @@ public class IdleTimeoutTest extends AbstractTest
public void testStreamIdleTimeoutIsNotEnforcedWhenSending() throws Exception
{
final CountDownLatch resetLatch = new CountDownLatch(1);
startServer(new ServerSessionListener.Adapter()
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)

View File

@ -34,7 +34,7 @@ public class PingTest extends AbstractTest
@Test
public void testPing() throws Exception
{
startServer(new ServerSessionListener.Adapter());
start(new ServerSessionListener.Adapter());
final byte[] payload = new byte[8];
new Random().nextBytes(payload);

View File

@ -60,7 +60,7 @@ public class PushCacheFilterTest extends AbstractTest
final String primaryResource = "/primary.html";
final String secondaryResource = "/secondary.png";
final byte[] secondaryData = "SECONDARY".getBytes("UTF-8");
startServer(new HttpServlet()
start(new HttpServlet()
{
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
@ -145,7 +145,7 @@ public class PushCacheFilterTest extends AbstractTest
final String primaryResource = "/primary.html";
final String secondaryResource = "/secondary.png";
final byte[] secondaryData = "SECONDARY".getBytes("UTF-8");
startServer(new HttpServlet()
start(new HttpServlet()
{
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException

View File

@ -0,0 +1,166 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.HTTP2Stream;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise;
import org.junit.Assert;
import org.junit.Test;
public class StreamCloseTest extends AbstractTest
{
@Test
public void testRequestClosedRemotelyClosesStream() throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
Assert.assertTrue(((HTTP2Stream)stream).isRemotelyClosed());
latch.countDown();
return null;
}
});
Session session = newClient(new Session.Listener.Adapter());
HeadersFrame frame = new HeadersFrame(0, newRequest("GET", new HttpFields()), null, true);
FuturePromise<Stream> promise = new FuturePromise<>();
session.newStream(frame, promise, null);
Stream stream = promise.get(5, TimeUnit.SECONDS);
Assert.assertTrue(((HTTP2Stream)stream).isLocallyClosed());
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testRequestClosedResponseClosedClosesStream() throws Exception
{
final CountDownLatch latch = new CountDownLatch(2);
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
HeadersFrame response = new HeadersFrame(stream.getId(), metaData, null, true);
stream.headers(response, Callback.Adapter.INSTANCE);
Assert.assertTrue(stream.isClosed());
Assert.assertEquals(0, stream.getSession().getStreams().size());
latch.countDown();
return null;
}
});
Session session = newClient(new Session.Listener.Adapter());
HeadersFrame frame = new HeadersFrame(0, newRequest("GET", new HttpFields()), null, true);
session.newStream(frame, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
Assert.assertTrue(stream.isClosed());
latch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testRequestDataClosedResponseDataClosedClosesStream() throws Exception
{
final CountDownLatch serverDataLatch = new CountDownLatch(1);
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
HeadersFrame response = new HeadersFrame(stream.getId(), metaData, null, false);
stream.headers(response, Callback.Adapter.INSTANCE);
return new Stream.Listener.Adapter()
{
@Override
public void onData(final Stream stream, DataFrame frame, final Callback callback)
{
Assert.assertTrue(((HTTP2Stream)stream).isRemotelyClosed());
stream.data(frame, new Callback.Adapter()
{
@Override
public void succeeded()
{
Assert.assertTrue(stream.isClosed());
Assert.assertEquals(0, stream.getSession().getStreams().size());
callback.succeeded();
serverDataLatch.countDown();
}
});
}
};
}
});
final CountDownLatch completeLatch = new CountDownLatch(1);
Session session = newClient(new Session.Listener.Adapter());
HeadersFrame frame = new HeadersFrame(0, newRequest("GET", new HttpFields()), null, false);
FuturePromise<Stream> promise = new FuturePromise<>();
session.newStream(frame, promise, new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
Assert.assertTrue(stream.isClosed());
completeLatch.countDown();
}
});
final Stream stream = promise.get(5, TimeUnit.SECONDS);
Assert.assertFalse(stream.isClosed());
Assert.assertFalse(((HTTP2Stream)stream).isLocallyClosed());
final CountDownLatch clientDataLatch = new CountDownLatch(1);
stream.data(new DataFrame(stream.getId(), ByteBuffer.wrap(new byte[512]), true), new Callback.Adapter()
{
@Override
public void succeeded()
{
Assert.assertTrue(((HTTP2Stream)stream).isLocallyClosed());
clientDataLatch.countDown();
}
});
Assert.assertTrue(clientDataLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(serverDataLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
Assert.assertEquals(0, stream.getSession().getStreams().size());
}
}

View File

@ -46,7 +46,7 @@ public class StreamCountTest extends AbstractTest
@Test
public void testServersAllowsOneStreamEnforcedByClient() throws Exception
{
startServer(new ServerSessionListener.Adapter()
start(new ServerSessionListener.Adapter()
{
@Override
public Map<Integer, Integer> onPreface(Session session)
@ -125,7 +125,7 @@ public class StreamCountTest extends AbstractTest
public void testServersAllowsOneStreamEnforcedByServer() throws Exception
{
final CountDownLatch resetLatch = new CountDownLatch(1);
startServer(new ServerSessionListener.Adapter()
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)

View File

@ -54,7 +54,7 @@ public class StreamResetTest extends AbstractTest
@Test
public void testStreamSendingResetIsRemoved() throws Exception
{
startServer(new ServerSessionListener.Adapter());
start(new ServerSessionListener.Adapter());
Session client = newClient(new Session.Listener.Adapter());
MetaData.Request request = newRequest("GET", new HttpFields());
@ -75,7 +75,7 @@ public class StreamResetTest extends AbstractTest
{
final AtomicReference<Stream> streamRef = new AtomicReference<>();
final CountDownLatch resetLatch = new CountDownLatch(1);
startServer(new ServerSessionListener.Adapter()
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
@ -118,7 +118,7 @@ public class StreamResetTest extends AbstractTest
{
final CountDownLatch serverResetLatch = new CountDownLatch(1);
final CountDownLatch serverDataLatch = new CountDownLatch(1);
startServer(new ServerSessionListener.Adapter()
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame requestFrame)
@ -208,7 +208,7 @@ public class StreamResetTest extends AbstractTest
{
final CountDownLatch resetLatch = new CountDownLatch(1);
final CountDownLatch dataLatch = new CountDownLatch(1);
startServer(new HttpServlet()
start(new HttpServlet()
{
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
@ -267,7 +267,7 @@ public class StreamResetTest extends AbstractTest
{
final CountDownLatch resetLatch = new CountDownLatch(1);
final CountDownLatch dataLatch = new CountDownLatch(1);
startServer(new HttpServlet()
start(new HttpServlet()
{
@Override
protected void doGet(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException

View File

@ -971,6 +971,14 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
{
switch (frame.getType())
{
case HEADERS:
{
HeadersFrame headersFrame = (HeadersFrame)frame;
stream.updateClose(headersFrame.isEndStream(), true);
if (stream.isClosed())
removeStream(stream, true);
break;
}
case RST_STREAM:
{
if (stream != null)
@ -979,8 +987,8 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
case SETTINGS:
{
SettingsFrame settings = (SettingsFrame)frame;
Integer initialWindow = settings.getSettings().get(SettingsFrame.INITIAL_WINDOW_SIZE);
SettingsFrame settingsFrame = (SettingsFrame)frame;
Integer initialWindow = settingsFrame.getSettings().get(SettingsFrame.INITIAL_WINDOW_SIZE);
if (initialWindow != null)
flowControl.updateInitialStreamWindow(HTTP2Session.this, initialWindow, true);
break;

View File

@ -133,11 +133,16 @@ public class HTTP2Stream extends IdleTimeout implements IStream
return closeState.get() == CloseState.CLOSED;
}
private boolean isRemotelyClosed()
public boolean isRemotelyClosed()
{
return closeState.get() == CloseState.REMOTELY_CLOSED;
}
public boolean isLocallyClosed()
{
return closeState.get() == CloseState.LOCALLY_CLOSED;
}
@Override
public boolean isOpen()
{