Implemented atomic update of the last good stream.

This commit is contained in:
Simone Bordet 2012-02-15 19:17:22 +01:00
parent 176230c344
commit 77cdee95ce
1 changed files with 10 additions and 6 deletions

View File

@ -73,7 +73,7 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
private final Generator generator;
private final AtomicBoolean goAwaySent = new AtomicBoolean();
private final AtomicBoolean goAwayReceived = new AtomicBoolean();
private volatile int lastStreamId;
private final AtomicInteger lastStreamId = new AtomicInteger();
private boolean flushing;
public StandardSession(short version, Controller<FrameBytes> controller, int initialStreamId, SessionFrameListener listener, Generator generator)
@ -275,7 +275,7 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
{
try
{
GoAwayFrame frame = new GoAwayFrame(version, lastStreamId, SessionStatus.OK.getCode());
GoAwayFrame frame = new GoAwayFrame(version, lastStreamId.get(), SessionStatus.OK.getCode());
control(null, frame, handler);
flush();
return;
@ -678,10 +678,14 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
int streamId = stream.getId();
if (stream.isClosed() && streamId % 2 != streamIds.get() % 2)
{
// TODO: perhaps we need a non-blocking updateMax()
// to avoid that concurrent updates overwrites
// the lastStreamId with lower values
lastStreamId = streamId;
// Non-blocking atomic update
int oldValue = lastStreamId.get();
while (streamId > oldValue)
{
if (lastStreamId.compareAndSet(oldValue, streamId))
break;
oldValue = lastStreamId.get();
}
}
}