diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java index a85103ba354..42f5ba0ef44 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java @@ -18,7 +18,6 @@ package org.eclipse.jetty.spdy; import java.nio.ByteBuffer; import java.nio.channels.InterruptedByTimeoutException; -import java.util.Deque; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -79,7 +78,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler listeners = new CopyOnWriteArrayList<>(); private final ConcurrentMap streams = new ConcurrentHashMap<>(); - private final Deque queue = new LinkedList<>(); + private final LinkedList queue = new LinkedList<>(); private final ByteBufferPool bufferPool; private final Executor threadPool; private final ScheduledExecutorService scheduler; @@ -732,10 +731,10 @@ public class StandardSession implements ISession, Parser.Listener, Handler frameBytes = new ControlFrameBytes<>(handler, context, frame, buffer); + ControlFrameBytes frameBytes = new ControlFrameBytes<>(stream, handler, context, frame, buffer); if (timeout > 0) frameBytes.task = scheduler.schedule(frameBytes, timeout, unit); - enqueueLast(frameBytes); + append(frameBytes); } flush(); @@ -766,10 +765,10 @@ public class StandardSession implements ISession, Parser.Listener, Handler void data(IStream stream, DataInfo dataInfo, long timeout, TimeUnit unit, Handler handler, C context) { logger.debug("Queuing {} on {}", dataInfo, stream); - DataFrameBytes frameBytes = new DataFrameBytes<>(handler, context, stream, dataInfo); + DataFrameBytes frameBytes = new DataFrameBytes<>(stream, handler, context, dataInfo); if (timeout > 0) frameBytes.task = scheduler.schedule(frameBytes, timeout, unit); - enqueueLast(frameBytes); + append(frameBytes); flush(); } @@ -781,56 +780,68 @@ public class StandardSession implements ISession, Parser.Listener, Handler stalledStreams = null; + for (int i = 0; i < queue.size(); ++i) { + frameBytes = queue.get(i); + + if (stalledStreams != null && stalledStreams.contains(frameBytes.getStream())) + continue; + buffer = frameBytes.getByteBuffer(); if (buffer != null) + { + queue.remove(i); break; + } + + if (stalledStreams == null) + stalledStreams = new HashSet<>(); + stalledStreams.add(frameBytes.getStream()); - // We are stalled: enqueue as last so other frames can be flushed - enqueueLast(frameBytes); - if (stalled == null) - stalled = frameBytes; - else if (stalled == frameBytes) - return; logger.debug("Flush stalled for {}, {} frame(s) in queue", frameBytes, queue.size()); - frameBytes = queue.poll(); } + if (buffer == null) + return; + flushing = true; logger.debug("Flushing {}, {} frame(s) in queue", frameBytes, queue.size()); } - - logger.debug("Writing {} frame bytes of {}", buffer.remaining(), frameBytes); write(buffer, this, frameBytes); } - private void enqueueLast(FrameBytes frameBytes) + private void append(FrameBytes frameBytes) { - // TODO: handle priority; e.g. use queues to prioritize the buffers ? - synchronized (queue) - { - queue.offerLast(frameBytes); - } + enqueue(frameBytes, false); } - private void enqueueFirst(FrameBytes frameBytes) + private void prepend(FrameBytes frameBytes) + { + enqueue(frameBytes, true); + } + + private void enqueue(FrameBytes frameBytes, boolean prepend) { synchronized (queue) { - queue.offerFirst(frameBytes); + int index = 0; + while (index < queue.size()) + { + FrameBytes element = queue.get(index); + int comparison = element.compareTo(frameBytes); + if (comparison > 0 || prepend && comparison == 0) + break; + ++index; + } + queue.add(index, frameBytes); } } @@ -854,7 +865,10 @@ public class StandardSession implements ISession, Parser.Listener, Handler handler, FrameBytes frameBytes) { if (controller != null) + { + logger.debug("Writing {} frame bytes of {}", buffer.remaining(), frameBytes); controller.write(buffer, handler, frameBytes); + } } private void complete(final Handler handler, final C context) @@ -920,8 +934,10 @@ public class StandardSession implements ISession, Parser.Listener, Handler { + public IStream getStream(); + public abstract ByteBuffer getByteBuffer(); public abstract void complete(); @@ -929,16 +945,30 @@ public class StandardSession implements ISession, Parser.Listener, Handler implements FrameBytes, Runnable { + private final IStream stream; private final Handler handler; private final C context; protected volatile ScheduledFuture task; - protected AbstractFrameBytes(Handler handler, C context) + protected AbstractFrameBytes(IStream stream, Handler handler, C context) { + this.stream = stream; this.handler = handler; this.context = context; } + @Override + public IStream getStream() + { + return stream; + } + + @Override + public int compareTo(FrameBytes that) + { + return getStream().getPriority() - that.getStream().getPriority(); + } + @Override public void complete() { @@ -966,9 +996,9 @@ public class StandardSession implements ISession, Parser.Listener, Handler handler, C context, ControlFrame frame, ByteBuffer buffer) + private ControlFrameBytes(IStream stream, Handler handler, C context, ControlFrame frame, ByteBuffer buffer) { - super(handler, context); + super(stream, handler, context); this.frame = frame; this.buffer = buffer; } @@ -1003,15 +1033,13 @@ public class StandardSession implements ISession, Parser.Listener, Handler extends AbstractFrameBytes { - private final IStream stream; private final DataInfo dataInfo; private int size; private volatile ByteBuffer buffer; - private DataFrameBytes(Handler handler, C context, IStream stream, DataInfo dataInfo) + private DataFrameBytes(IStream stream, Handler handler, C context, DataInfo dataInfo) { - super(handler, context); - this.stream = stream; + super(stream, handler, context); this.dataInfo = dataInfo; } @@ -1020,6 +1048,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler 0) @@ -1049,7 +1079,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler