375509 - Stalled stream stalls other streams or session control frames.

Now using a "death pill" instead of a boolean in order to avoid race conditions where
DataInfos were read from the queue (but the boolean not updated yet), and viceversa.
This commit is contained in:
Simone Bordet 2012-04-02 10:09:55 +02:00
parent 00b31b6577
commit bb429a7f18
1 changed files with 35 additions and 21 deletions

View File

@ -57,6 +57,7 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
{
private static final Logger logger = LoggerFactory.getLogger(ServerHTTPSPDYAsyncConnection.class);
private static final ByteBuffer ZERO_BYTES = ByteBuffer.allocate(0);
private static final DataInfo END_OF_CONTENT = new ByteBufferDataInfo(ZERO_BYTES, true);
private final Queue<Runnable> tasks = new LinkedList<>();
private final BlockingQueue<DataInfo> dataInfos = new LinkedBlockingQueue<>();
@ -65,7 +66,6 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
private Headers headers; // No need for volatile, guarded by state
private DataInfo dataInfo; // No need for volatile, guarded by state
private NIOBuffer buffer; // No need for volatile, guarded by state
private boolean complete; // No need for volatile, guarded by state
private volatile State state = State.INITIAL;
private boolean dispatched; // Guarded by synchronization on tasks
@ -160,7 +160,7 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
logger.debug("HTTP > {} {} {}", new Object[]{m, u, v});
startRequest(new ByteArrayBuffer(m), new ByteArrayBuffer(u), new ByteArrayBuffer(v));
state = State.HEADERS;
updateState(State.HEADERS);
handle();
break;
}
@ -261,6 +261,12 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
{
}
private void updateState(State newState)
{
logger.debug("State update {} -> {}", state, newState);
state = newState;
}
public void beginRequest(final Headers headers)
{
this.headers = headers.isEmpty() ? null : headers;
@ -270,7 +276,7 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
public void run()
{
if (!headers.isEmpty())
state = State.REQUEST;
updateState(State.REQUEST);
handle();
}
});
@ -284,7 +290,7 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
@Override
public void run()
{
state = state == State.INITIAL ? State.REQUEST : State.HEADERS;
updateState(state == State.INITIAL ? State.REQUEST : State.HEADERS);
handle();
}
});
@ -292,7 +298,10 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
public void content(final DataInfo dataInfo, boolean endRequest)
{
dataInfos.offer(new ByteBufferDataInfo(dataInfo.asByteBuffer(false), dataInfo.isClose(), dataInfo.isCompress())
// We need to copy the dataInfo since we do not know when its bytes
// will be consumed. When the copy is consumed, we consume also the
// original, so the implementation can send a window update.
ByteBufferDataInfo copyDataInfo = new ByteBufferDataInfo(dataInfo.asByteBuffer(false), dataInfo.isClose(), dataInfo.isCompress())
{
@Override
public void consume(int delta)
@ -300,8 +309,11 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
super.consume(delta);
dataInfo.consume(delta);
}
});
complete = endRequest;
};
logger.debug("Queuing last={} content {}", endRequest, copyDataInfo);
dataInfos.offer(copyDataInfo);
if (endRequest)
dataInfos.offer(END_OF_CONTENT);
post(new Runnable()
{
@Override
@ -310,10 +322,10 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
logger.debug("HTTP > {} bytes of content", dataInfo.length());
if (state == State.HEADERS)
{
state = State.HEADERS_COMPLETE;
updateState(State.HEADERS_COMPLETE);
handle();
}
state = State.CONTENT;
updateState(State.CONTENT);
handle();
}
});
@ -327,10 +339,10 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
{
if (state == State.HEADERS)
{
state = State.HEADERS_COMPLETE;
updateState(State.HEADERS_COMPLETE);
handle();
}
state = State.FINAL;
updateState(State.FINAL);
handle();
}
});
@ -343,10 +355,10 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
@Override
public void run()
{
State currentState = state;
state = State.ASYNC;
State oldState = state;
updateState(State.ASYNC);
handle();
state = currentState;
updateState(oldState);
}
});
}
@ -370,12 +382,10 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
else
{
// The application has consumed the buffer, so consume also the DataInfo
if (dataInfo.consumed() == 0)
dataInfo.consume(dataInfo.length());
dataInfo.consume(dataInfo.length());
logger.debug("Consumed {} content bytes, queue size {}", dataInfo.consumed(), dataInfos.size());
dataInfo = null;
buffer = null;
if (complete && dataInfos.isEmpty())
return null;
// Loop to get content bytes from DataInfos
}
}
@ -388,9 +398,13 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
logger.debug("Waited {} ms for content bytes", elapsed);
if (dataInfo != null)
{
// Only consume if it's the last DataInfo
boolean consume = complete && dataInfos.isEmpty();
ByteBuffer byteBuffer = dataInfo.asByteBuffer(consume);
if (dataInfo == END_OF_CONTENT)
{
logger.debug("End of content bytes, queue size {}", dataInfos.size());
return null;
}
ByteBuffer byteBuffer = dataInfo.asByteBuffer(false);
buffer = byteBuffer.isDirect() ? new DirectNIOBuffer(byteBuffer, false) : new IndirectNIOBuffer(byteBuffer, false);
// Loop to return the buffer
}