Ignoring data frames after RST_STREAM.

This commit is contained in:
Simone Bordet 2012-03-07 14:04:27 +01:00
parent 8f37221b56
commit c5c4425a0b
2 changed files with 150 additions and 23 deletions

View File

@ -135,7 +135,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
@Override
public void syn(SynInfo synInfo, StreamFrameListener listener, long timeout, TimeUnit unit, final Handler<Stream> handler)
public void syn(SynInfo synInfo, StreamFrameListener listener, long timeout, TimeUnit unit, Handler<Stream> handler)
{
// Synchronization is necessary.
// SPEC v3, 2.3.1 requires that the stream creation be monotonically crescent
@ -154,7 +154,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
{
int streamId = streamIds.getAndAdd(2);
SynStreamFrame synStream = new SynStreamFrame(version, synInfo.getFlags(), streamId, 0, synInfo.getPriority(), synInfo.getHeaders());
final IStream stream = createStream(synStream, listener);
IStream stream = createStream(synStream, listener);
control(stream, synStream, timeout, unit, handler, stream);
}
}
@ -178,7 +178,11 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
else
{
RstStreamFrame frame = new RstStreamFrame(version, rstInfo.getStreamId(), rstInfo.getStreamStatus().getCode(version));
int streamId = rstInfo.getStreamId();
IStream stream = streams.get(streamId);
if (stream != null)
removeStream(stream);
RstStreamFrame frame = new RstStreamFrame(version, streamId, rstInfo.getStreamStatus().getCode(version));
control(null, frame, timeout, unit, handler, null);
}
}
@ -207,7 +211,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
@Override
public void ping(long timeout, TimeUnit unit, final Handler<PingInfo> handler)
public void ping(long timeout, TimeUnit unit, Handler<PingInfo> handler)
{
int pingId = pingIds.getAndAdd(2);
PingInfo pingInfo = new PingInfo(pingId);
@ -330,7 +334,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
@Override
public void onDataFrame(final DataFrame frame, final ByteBuffer data)
public void onDataFrame(DataFrame frame, ByteBuffer data)
{
notifyIdle(idleListener, false);
try
@ -344,7 +348,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
int streamId = frame.getStreamId();
final IStream stream = streams.get(streamId);
IStream stream = streams.get(streamId);
if (stream == null)
{
RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
@ -391,9 +395,9 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
goAway(x.getSessionStatus());
}
private void onSyn(final SynStreamFrame frame)
private void onSyn(SynStreamFrame frame)
{
final IStream stream = newStream(frame);
IStream stream = newStream(frame);
logger.debug("Opening {}", stream);
int streamId = frame.getStreamId();
IStream existing = streams.putIfAbsent(streamId, stream);
@ -491,10 +495,10 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
}
private void onReply(final SynReplyFrame frame)
private void onReply(SynReplyFrame frame)
{
int streamId = frame.getStreamId();
final IStream stream = streams.get(streamId);
IStream stream = streams.get(streamId);
if (stream == null)
{
RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
@ -514,11 +518,11 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
removeStream(stream);
}
private void onRst(final RstStreamFrame frame)
private void onRst(RstStreamFrame frame)
{
// TODO: implement logic to clean up unidirectional streams associated with this stream
final IStream stream = streams.get(frame.getStreamId());
IStream stream = streams.get(frame.getStreamId());
if (stream != null)
stream.process(frame);
@ -531,7 +535,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
removeStream(stream);
}
private void onSettings(final SettingsFrame frame)
private void onSettings(SettingsFrame frame)
{
Settings.Setting windowSizeSetting = frame.getSettings().get(Settings.ID.INITIAL_WINDOW_SIZE);
if (windowSizeSetting != null)
@ -545,7 +549,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
flush();
}
private void onPing(final PingFrame frame)
private void onPing(PingFrame frame)
{
int pingId = frame.getPingId();
if (pingId % 2 == pingIds.get() % 2)
@ -560,7 +564,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
}
private void onGoAway(final GoAwayFrame frame)
private void onGoAway(GoAwayFrame frame)
{
if (goAwayReceived.compareAndSet(false, true))
{
@ -574,10 +578,10 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
}
private void onHeaders(final HeadersFrame frame)
private void onHeaders(HeadersFrame frame)
{
int streamId = frame.getStreamId();
final IStream stream = streams.get(streamId);
IStream stream = streams.get(streamId);
if (stream == null)
{
RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
@ -677,7 +681,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
}
private void notifyOnPing(SessionFrameListener listener, final PingInfo pingInfo)
private void notifyOnPing(SessionFrameListener listener, PingInfo pingInfo)
{
try
{
@ -710,7 +714,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
@Override
public <C> void control(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, final Handler<C> handler, C context)
public <C> void control(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Handler<C> handler, C context)
{
try
{
@ -732,7 +736,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
flush();
}
catch (final Throwable x)
catch (Throwable x)
{
notifyHandlerFailed(handler, x);
}
@ -765,7 +769,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
flush();
}
private void execute(final Runnable task)
private void execute(Runnable task)
{
threadPool.execute(task);
}
@ -840,7 +844,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
throw new SPDYException(x);
}
protected void write(final ByteBuffer buffer, Handler<FrameBytes> handler, FrameBytes frameBytes)
protected void write(ByteBuffer buffer, Handler<FrameBytes> handler, FrameBytes frameBytes)
{
if (controller != null)
controller.write(buffer, handler, frameBytes);
@ -937,7 +941,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
StandardSession.this.complete(handler, context);
}
protected void fail(final Throwable x)
protected void fail(Throwable x)
{
notifyHandlerFailed(handler, x);
}

View File

@ -0,0 +1,123 @@
package org.eclipse.jetty.spdy;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.StreamStatus;
import org.eclipse.jetty.spdy.api.StringDataInfo;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.junit.Assert;
import org.junit.Test;
public class ResetStreamTest extends AbstractTest
{
@Test
public void testResetStreamIsRemoved() throws Exception
{
Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()), null);
Stream stream = session.syn(new SynInfo(false), null).get(5, TimeUnit.SECONDS);
session.rst(new RstInfo(stream.getId(), StreamStatus.CANCEL_STREAM)).get(5, TimeUnit.SECONDS);
Assert.assertEquals(0, session.getStreams().size());
}
@Test
public void testRefusedStreamIsRemoved() throws Exception
{
final AtomicReference<Session> serverSessionRef = new AtomicReference<>();
final CountDownLatch synLatch = new CountDownLatch(1);
final CountDownLatch rstLatch = new CountDownLatch(1);
Session clientSession = startClient(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
Session serverSession = stream.getSession();
serverSessionRef.set(serverSession);
serverSession.rst(new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM));
synLatch.countDown();
return null;
}
}), new SessionFrameListener.Adapter()
{
@Override
public void onRst(Session session, RstInfo rstInfo)
{
rstLatch.countDown();
}
});
clientSession.syn(new SynInfo(false), null).get(5, TimeUnit.SECONDS);
Assert.assertTrue(synLatch.await(5, TimeUnit.SECONDS));
Session serverSession = serverSessionRef.get();
Assert.assertEquals(0, serverSession.getStreams().size());
Assert.assertTrue(rstLatch.await(5, TimeUnit.SECONDS));
Assert.assertEquals(0, clientSession.getStreams().size());
}
@Test
public void testRefusedStreamIgnoresData() throws Exception
{
final CountDownLatch synLatch = new CountDownLatch(1);
final CountDownLatch dataLatch = new CountDownLatch(1);
final CountDownLatch rstLatch = new CountDownLatch(1);
Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
try
{
// Refuse the stream, we must ignore data frames
Assert.assertTrue(synLatch.await(5, TimeUnit.SECONDS));
stream.getSession().rst(new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM));
return new StreamFrameListener.Adapter()
{
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
dataLatch.countDown();
}
};
}
catch (InterruptedException x)
{
x.printStackTrace();
return null;
}
}
}), new SessionFrameListener.Adapter()
{
@Override
public void onRst(Session session, RstInfo rstInfo)
{
rstLatch.countDown();
}
});
Stream stream = session.syn(new SynInfo(false), null).get(5, TimeUnit.SECONDS);
stream.data(new StringDataInfo("data", true), 5, TimeUnit.SECONDS, new Handler.Adapter<Void>()
{
@Override
public void completed(Void context)
{
synLatch.countDown();
}
});
Assert.assertTrue(rstLatch.await(5, TimeUnit.SECONDS));
Assert.assertFalse(dataLatch.await(1, TimeUnit.SECONDS));
}
}