375509 - Stalled stream stalls other streams or session control frames.

Additional fixes to the implementation of flush(). Also implemented frame priority.
This commit is contained in:
Simone Bordet 2012-03-30 18:47:47 +02:00
parent 2a44fa5c22
commit b0a3d031aa
1 changed files with 72 additions and 42 deletions

View File

@ -18,7 +18,6 @@ package org.eclipse.jetty.spdy;
import java.nio.ByteBuffer;
import java.nio.channels.InterruptedByTimeoutException;
import java.util.Deque;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@ -79,7 +78,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>();
private final Deque<FrameBytes> queue = new LinkedList<>();
private final LinkedList<FrameBytes> queue = new LinkedList<>();
private final ByteBufferPool bufferPool;
private final Executor threadPool;
private final ScheduledExecutorService scheduler;
@ -732,10 +731,10 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
{
ByteBuffer buffer = generator.control(frame);
logger.debug("Queuing {} on {}", frame, stream);
ControlFrameBytes<C> frameBytes = new ControlFrameBytes<>(handler, context, frame, buffer);
ControlFrameBytes<C> frameBytes = new ControlFrameBytes<>(stream, handler, context, frame, buffer);
if (timeout > 0)
frameBytes.task = scheduler.schedule(frameBytes, timeout, unit);
enqueueLast(frameBytes);
append(frameBytes);
}
flush();
@ -766,10 +765,10 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
public <C> void data(IStream stream, DataInfo dataInfo, long timeout, TimeUnit unit, Handler<C> handler, C context)
{
logger.debug("Queuing {} on {}", dataInfo, stream);
DataFrameBytes<C> frameBytes = new DataFrameBytes<>(handler, context, stream, dataInfo);
DataFrameBytes<C> frameBytes = new DataFrameBytes<>(stream, handler, context, dataInfo);
if (timeout > 0)
frameBytes.task = scheduler.schedule(frameBytes, timeout, unit);
enqueueLast(frameBytes);
append(frameBytes);
flush();
}
@ -781,56 +780,68 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
@Override
public void flush()
{
FrameBytes frameBytes;
ByteBuffer buffer;
FrameBytes frameBytes = null;
ByteBuffer buffer = null;
synchronized (queue)
{
if (flushing)
if (flushing || queue.isEmpty())
return;
frameBytes = queue.poll();
if (frameBytes == null)
return;
FrameBytes stalled = null;
while (true)
Set<IStream> stalledStreams = null;
for (int i = 0; i < queue.size(); ++i)
{
frameBytes = queue.get(i);
if (stalledStreams != null && stalledStreams.contains(frameBytes.getStream()))
continue;
buffer = frameBytes.getByteBuffer();
if (buffer != null)
{
queue.remove(i);
break;
// We are stalled: enqueue as last so other frames can be flushed
enqueueLast(frameBytes);
if (stalled == null)
stalled = frameBytes;
else if (stalled == frameBytes)
return;
logger.debug("Flush stalled for {}, {} frame(s) in queue", frameBytes, queue.size());
frameBytes = queue.poll();
}
if (stalledStreams == null)
stalledStreams = new HashSet<>();
stalledStreams.add(frameBytes.getStream());
logger.debug("Flush stalled for {}, {} frame(s) in queue", frameBytes, queue.size());
}
if (buffer == null)
return;
flushing = true;
logger.debug("Flushing {}, {} frame(s) in queue", frameBytes, queue.size());
}
logger.debug("Writing {} frame bytes of {}", buffer.remaining(), frameBytes);
write(buffer, this, frameBytes);
}
private void enqueueLast(FrameBytes frameBytes)
private void append(FrameBytes frameBytes)
{
// TODO: handle priority; e.g. use queues to prioritize the buffers ?
synchronized (queue)
{
queue.offerLast(frameBytes);
}
enqueue(frameBytes, false);
}
private void enqueueFirst(FrameBytes frameBytes)
private void prepend(FrameBytes frameBytes)
{
enqueue(frameBytes, true);
}
private void enqueue(FrameBytes frameBytes, boolean prepend)
{
synchronized (queue)
{
queue.offerFirst(frameBytes);
int index = 0;
while (index < queue.size())
{
FrameBytes element = queue.get(index);
int comparison = element.compareTo(frameBytes);
if (comparison > 0 || prepend && comparison == 0)
break;
++index;
}
queue.add(index, frameBytes);
}
}
@ -854,8 +865,11 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
protected void write(ByteBuffer buffer, Handler<FrameBytes> handler, FrameBytes frameBytes)
{
if (controller != null)
{
logger.debug("Writing {} frame bytes of {}", buffer.remaining(), frameBytes);
controller.write(buffer, handler, frameBytes);
}
}
private <C> void complete(final Handler<C> handler, final C context)
{
@ -920,8 +934,10 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
}
public interface FrameBytes
public interface FrameBytes extends Comparable<FrameBytes>
{
public IStream getStream();
public abstract ByteBuffer getByteBuffer();
public abstract void complete();
@ -929,16 +945,30 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private abstract class AbstractFrameBytes<C> implements FrameBytes, Runnable
{
private final IStream stream;
private final Handler<C> handler;
private final C context;
protected volatile ScheduledFuture<?> task;
protected AbstractFrameBytes(Handler<C> handler, C context)
protected AbstractFrameBytes(IStream stream, Handler<C> handler, C context)
{
this.stream = stream;
this.handler = handler;
this.context = context;
}
@Override
public IStream getStream()
{
return stream;
}
@Override
public int compareTo(FrameBytes that)
{
return getStream().getPriority() - that.getStream().getPriority();
}
@Override
public void complete()
{
@ -966,9 +996,9 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private final ControlFrame frame;
private final ByteBuffer buffer;
private ControlFrameBytes(Handler<C> handler, C context, ControlFrame frame, ByteBuffer buffer)
private ControlFrameBytes(IStream stream, Handler<C> handler, C context, ControlFrame frame, ByteBuffer buffer)
{
super(handler, context);
super(stream, handler, context);
this.frame = frame;
this.buffer = buffer;
}
@ -1003,15 +1033,13 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private class DataFrameBytes<C> extends AbstractFrameBytes<C>
{
private final IStream stream;
private final DataInfo dataInfo;
private int size;
private volatile ByteBuffer buffer;
private DataFrameBytes(Handler<C> handler, C context, IStream stream, DataInfo dataInfo)
private DataFrameBytes(IStream stream, Handler<C> handler, C context, DataInfo dataInfo)
{
super(handler, context);
this.stream = stream;
super(stream, handler, context);
this.dataInfo = dataInfo;
}
@ -1020,6 +1048,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
{
try
{
IStream stream = getStream();
int windowSize = stream.getWindowSize();
if (windowSize <= 0)
return null;
@ -1042,6 +1071,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
public void complete()
{
bufferPool.release(buffer);
IStream stream = getStream();
stream.updateWindowSize(-size);
if (dataInfo.available() > 0)
@ -1049,7 +1079,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
// We have written a frame out of this DataInfo, but there is more to write.
// We need to keep the correct ordering of frames, to avoid that another
// DataInfo for the same stream is written before this one is finished.
enqueueFirst(this);
prepend(this);
}
else
{
@ -1063,7 +1093,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
@Override
public String toString()
{
return String.format("DATA bytes @%x available=%d consumed=%d on %s", dataInfo.hashCode(), dataInfo.available(), dataInfo.consumed(), stream);
return String.format("DATA bytes @%x available=%d consumed=%d on %s", dataInfo.hashCode(), dataInfo.available(), dataInfo.consumed(), getStream());
}
}
}