Modified StandardStream to not depend on SynStreamFrame.

This commit is contained in:
Simone Bordet 2012-06-11 14:32:14 +02:00
parent aeb3a23482
commit 457fdc74e5
4 changed files with 52 additions and 48 deletions

View File

@ -476,7 +476,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private IStream newStream(SynStreamFrame frame)
{
IStream associatedStream = streams.get(frame.getAssociatedStreamId());
IStream stream = new StandardStream(frame, this, associatedStream);
IStream stream = new StandardStream(frame.getStreamId(), frame.getPriority(), this, associatedStream);
flowControlStrategy.onNewStream(this, stream);
return stream;
}

View File

@ -37,7 +37,6 @@ import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.frames.ControlFrame;
import org.eclipse.jetty.spdy.frames.HeadersFrame;
import org.eclipse.jetty.spdy.frames.SynReplyFrame;
import org.eclipse.jetty.spdy.frames.SynStreamFrame;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -45,9 +44,10 @@ public class StandardStream implements IStream
{
private static final Logger logger = Log.getLogger(Stream.class);
private final Map<String, Object> attributes = new ConcurrentHashMap<>();
private final IStream associatedStream;
private final SynStreamFrame frame;
private final int id;
private final byte priority;
private final ISession session;
private final IStream associatedStream;
private final AtomicInteger windowSize = new AtomicInteger();
private final Set<Stream> pushedStreams = Collections.newSetFromMap(new ConcurrentHashMap<Stream, Boolean>());
private volatile StreamFrameListener listener;
@ -55,9 +55,10 @@ public class StandardStream implements IStream
private volatile CloseState closeState = CloseState.OPENED;
private volatile boolean reset = false;
public StandardStream(SynStreamFrame frame, ISession session, IStream associatedStream)
public StandardStream(int id, byte priority, ISession session, IStream associatedStream)
{
this.frame = frame;
this.id = id;
this.priority = priority;
this.session = session;
this.associatedStream = associatedStream;
}
@ -65,7 +66,7 @@ public class StandardStream implements IStream
@Override
public int getId()
{
return frame.getStreamId();
return id;
}
@Override
@ -95,7 +96,7 @@ public class StandardStream implements IStream
@Override
public byte getPriority()
{
return frame.getPriority();
return priority;
}
@Override
@ -150,7 +151,7 @@ public class StandardStream implements IStream
{
case OPENED:
{
closeState = local?CloseState.LOCALLY_CLOSED:CloseState.REMOTELY_CLOSED;
closeState = local ? CloseState.LOCALLY_CLOSED : CloseState.REMOTELY_CLOSED;
break;
}
case LOCALLY_CLOSED:
@ -191,16 +192,16 @@ public class StandardStream implements IStream
{
openState = OpenState.REPLY_RECV;
SynReplyFrame synReply = (SynReplyFrame)frame;
updateCloseState(synReply.isClose(),false);
ReplyInfo replyInfo = new ReplyInfo(synReply.getHeaders(),synReply.isClose());
updateCloseState(synReply.isClose(), false);
ReplyInfo replyInfo = new ReplyInfo(synReply.getHeaders(), synReply.isClose());
notifyOnReply(replyInfo);
break;
}
case HEADERS:
{
HeadersFrame headers = (HeadersFrame)frame;
updateCloseState(headers.isClose(),false);
HeadersInfo headersInfo = new HeadersInfo(headers.getHeaders(),headers.isClose(),headers.isResetCompression());
updateCloseState(headers.isClose(), false);
HeadersInfo headersInfo = new HeadersInfo(headers.getHeaders(), headers.isClose(), headers.isResetCompression());
notifyOnHeaders(headersInfo);
break;
}
@ -269,7 +270,7 @@ public class StandardStream implements IStream
{
if (listener != null)
{
logger.debug("Invoking headers callback with {} on listener {}", frame, listener);
logger.debug("Invoking headers callback with {} on listener {}", headersInfo, listener);
listener.onHeaders(this, headersInfo);
}
}
@ -320,11 +321,11 @@ public class StandardStream implements IStream
{
if (isClosed() || isReset())
{
handler.failed(this, new StreamException(getId(),StreamStatus.STREAM_ALREADY_CLOSED));
handler.failed(this, new StreamException(getId(), StreamStatus.STREAM_ALREADY_CLOSED));
return;
}
PushSynInfo pushSynInfo = new PushSynInfo(getId(),synInfo);
session.syn(pushSynInfo,null,timeout,unit,handler);
PushSynInfo pushSynInfo = new PushSynInfo(getId(), synInfo);
session.syn(pushSynInfo, null, timeout, unit, handler);
}
@Override
@ -341,9 +342,9 @@ public class StandardStream implements IStream
if (isUnidirectional())
throw new IllegalStateException("Protocol violation: cannot send SYN_REPLY frames in unidirectional streams");
openState = OpenState.REPLY_SENT;
updateCloseState(replyInfo.isClose(),true);
SynReplyFrame frame = new SynReplyFrame(session.getVersion(),replyInfo.getFlags(),getId(),replyInfo.getHeaders());
session.control(this,frame,timeout,unit,handler,null);
updateCloseState(replyInfo.isClose(), true);
SynReplyFrame frame = new SynReplyFrame(session.getVersion(), replyInfo.getFlags(), getId(), replyInfo.getHeaders());
session.control(this, frame, timeout, unit, handler, null);
}
@Override
@ -359,18 +360,18 @@ public class StandardStream implements IStream
{
if (!canSend())
{
session.rst(new RstInfo(getId(),StreamStatus.PROTOCOL_ERROR));
session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
throw new IllegalStateException("Protocol violation: cannot send a DATA frame before a SYN_REPLY frame");
}
if (isLocallyClosed())
{
session.rst(new RstInfo(getId(),StreamStatus.PROTOCOL_ERROR));
session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
throw new IllegalStateException("Protocol violation: cannot send a DATA frame on a closed stream");
}
// Cannot update the close state here, because the data that we send may
// be flow controlled, so we need the stream to update the window size.
session.data(this,dataInfo,timeout,unit,handler,null);
session.data(this, dataInfo, timeout, unit, handler, null);
}
@Override
@ -386,18 +387,18 @@ public class StandardStream implements IStream
{
if (!canSend())
{
session.rst(new RstInfo(getId(),StreamStatus.PROTOCOL_ERROR));
session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame before a SYN_REPLY frame");
}
if (isLocallyClosed())
{
session.rst(new RstInfo(getId(),StreamStatus.PROTOCOL_ERROR));
session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame on a closed stream");
}
updateCloseState(headersInfo.isClose(),true);
HeadersFrame frame = new HeadersFrame(session.getVersion(),headersInfo.getFlags(),getId(),headersInfo.getHeaders());
session.control(this,frame,timeout,unit,handler,null);
updateCloseState(headersInfo.isClose(), true);
HeadersFrame frame = new HeadersFrame(session.getVersion(), headersInfo.getFlags(), getId(), headersInfo.getHeaders());
session.control(this, frame, timeout, unit, handler, null);
}
@Override
@ -440,7 +441,7 @@ public class StandardStream implements IStream
@Override
public String toString()
{
return String.format("stream=%d v%d %s",getId(),session.getVersion(),closeState);
return String.format("stream=%d v%d %s", getId(), session.getVersion(), closeState);
}
private boolean canSend()

View File

@ -405,7 +405,7 @@ public class StandardSessionTest
final CountDownLatch failedCalledLatch = new CountDownLatch(2);
SynStreamFrame synStreamFrame = new SynStreamFrame(SPDY.V2, SynInfo.FLAG_CLOSE, 1, 0, (byte)0, (short)0, null);
IStream stream = new StandardStream(synStreamFrame, session, null);
IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null);
stream.updateWindowSize(8192);
Handler.Adapter<Void> handler = new Handler.Adapter<Void>()
{

View File

@ -46,15 +46,13 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/* ------------------------------------------------------------ */
/**
*/
@RunWith(MockitoJUnitRunner.class)
public class StandardStreamTest
{
@Mock private ISession session;
@Mock private SynStreamFrame synStreamFrame;
@Mock
private ISession session;
@Mock
private SynStreamFrame synStreamFrame;
/**
* Test method for {@link org.eclipse.jetty.spdy.StandardStream#syn(org.eclipse.jetty.spdy.api.SynInfo)}.
@ -63,17 +61,18 @@ public class StandardStreamTest
@Test
public void testSyn()
{
Stream stream = new StandardStream(synStreamFrame,session,null);
Stream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null);
Set<Stream> streams = new HashSet<>();
streams.add(stream);
when(synStreamFrame.isClose()).thenReturn(false);
SynInfo synInfo = new SynInfo(false);
when(session.getStreams()).thenReturn(streams);
stream.syn(synInfo);
verify(session).syn(argThat(new PushSynInfoMatcher(stream.getId(),synInfo)),any(StreamFrameListener.class),anyLong(),any(TimeUnit.class),any(Handler.class));
verify(session).syn(argThat(new PushSynInfoMatcher(stream.getId(), synInfo)), any(StreamFrameListener.class), anyLong(), any(TimeUnit.class), any(Handler.class));
}
private class PushSynInfoMatcher extends ArgumentMatcher<PushSynInfo>{
private class PushSynInfoMatcher extends ArgumentMatcher<PushSynInfo>
{
int associatedStreamId;
SynInfo synInfo;
@ -82,15 +81,18 @@ public class StandardStreamTest
this.associatedStreamId = associatedStreamId;
this.synInfo = synInfo;
}
@Override
public boolean matches(Object argument)
{
PushSynInfo pushSynInfo = (PushSynInfo)argument;
if(pushSynInfo.getAssociatedStreamId() != associatedStreamId){
if (pushSynInfo.getAssociatedStreamId() != associatedStreamId)
{
System.out.println("streamIds do not match!");
return false;
}
if(pushSynInfo.isClose() != synInfo.isClose()){
if (pushSynInfo.isClose() != synInfo.isClose())
{
System.out.println("isClose doesn't match");
return false;
}
@ -99,13 +101,14 @@ public class StandardStreamTest
}
@Test
public void testSynOnClosedStream(){
IStream stream = new StandardStream(synStreamFrame,session,null);
stream.updateCloseState(true,true);
stream.updateCloseState(true,false);
assertThat("stream expected to be closed",stream.isClosed(),is(true));
public void testSynOnClosedStream()
{
IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null);
stream.updateCloseState(true, true);
stream.updateCloseState(true, false);
assertThat("stream expected to be closed", stream.isClosed(), is(true));
final CountDownLatch failedLatch = new CountDownLatch(1);
stream.syn(new SynInfo(false),1,TimeUnit.SECONDS,new Handler.Adapter<Stream>()
stream.syn(new SynInfo(false), 1, TimeUnit.SECONDS, new Handler.Adapter<Stream>()
{
@Override
public void failed(Stream stream, Throwable x)
@ -121,7 +124,7 @@ public class StandardStreamTest
public void testSendDataOnHalfClosedStream() throws InterruptedException, ExecutionException, TimeoutException
{
SynStreamFrame synStreamFrame = new SynStreamFrame(SPDY.V2, SynInfo.FLAG_CLOSE, 1, 0, (byte)0, (short)0, null);
IStream stream = new StandardStream(synStreamFrame, session, null);
IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null);
stream.updateWindowSize(8192);
stream.updateCloseState(synStreamFrame.isClose(), true);
assertThat("stream is half closed", stream.isHalfClosed(), is(true));