Fixed SPDY Flusher.

Write were performed from within synchronized blocks, which was wrong.
Refactored the code to make these calls outside of synchronized blocks.
This commit is contained in:
Simone Bordet 2013-12-20 19:13:09 +01:00
parent 330e7f87d7
commit 7a51f602dd
2 changed files with 132 additions and 117 deletions

View File

@ -25,7 +25,6 @@ import java.util.List;
import java.util.Set;
import org.eclipse.jetty.spdy.StandardSession.FrameBytes;
import org.eclipse.jetty.spdy.api.SPDYException;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamStatus;
import org.eclipse.jetty.util.ArrayQueue;
@ -36,20 +35,26 @@ import org.eclipse.jetty.util.log.Logger;
public class Flusher
{
private static final Logger LOG = Log.getLogger(Flusher.class);
private static final int MAX_GATHER = Integer.getInteger("org.eclipse.jetty.spdy.Flusher.MAX_GATHER",8);
private final FlusherCB flusherCB = new FlusherCB();
private final Controller controller;
private final IteratingCallback callback = new FlusherCallback();
private final Object lock = new Object();
private final ArrayQueue<StandardSession.FrameBytes> queue = new ArrayQueue<>(lock);
private final Controller controller;
private final int maxGather;
private Throwable failure;
public Flusher(Controller controller)
{
this.controller = controller;
this(controller, 8);
}
void removeFrameBytesFromQueue(Stream stream)
public Flusher(Controller controller, int maxGather)
{
this.controller = controller;
this.maxGather = maxGather;
}
public void removeFrameBytesFromQueue(Stream stream)
{
synchronized (lock)
{
@ -59,51 +64,16 @@ public class Flusher
}
}
void append(StandardSession.FrameBytes frameBytes)
public Throwable prepend(StandardSession.FrameBytes frameBytes)
{
Throwable failure;
synchronized (lock)
{
failure = this.failure;
if (failure == null)
{
// Control frames are added in order
if (frameBytes instanceof StandardSession.ControlFrameBytes)
queue.add(frameBytes);
else
{
// Otherwise scan from the back of the queue to insert by priority
int index = queue.size();
while (index > 0)
{
StandardSession.FrameBytes element = queue.getUnsafe(index - 1);
if (element.compareTo(frameBytes) >= 0)
break;
--index;
}
queue.add(index, frameBytes);
}
}
}
// If no failures make sure we are iterating
if (failure == null)
flush();
else
frameBytes.failed(new SPDYException(failure));
}
void prepend(StandardSession.FrameBytes frameBytes)
{
Throwable failure;
synchronized (lock)
{
failure = this.failure;
Throwable failure = this.failure;
if (failure == null)
{
// Scan from the front of the queue looking to skip higher priority messages
int index = 0;
int size=queue.size();
int size = queue.size();
while (index < size)
{
StandardSession.FrameBytes element = queue.getUnsafe(index);
@ -113,18 +83,49 @@ public class Flusher
}
queue.add(index, frameBytes);
}
return failure;
}
// If no failures make sure we are iterating
if (failure == null)
flush();
else
frameBytes.failed(new SPDYException(failure));
}
void flush()
public Throwable append(StandardSession.FrameBytes frameBytes)
{
flusherCB.iterate();
synchronized (lock)
{
Throwable failure = this.failure;
if (failure == null)
{
// Non DataFrameBytes are inserted last
queue.add(frameBytes);
}
return failure;
}
}
public Throwable append(StandardSession.DataFrameBytes frameBytes)
{
synchronized (lock)
{
Throwable failure = this.failure;
if (failure == null)
{
// DataFrameBytes are inserted by priority
int index = queue.size();
while (index > 0)
{
StandardSession.FrameBytes element = queue.getUnsafe(index - 1);
if (element.compareTo(frameBytes) >= 0)
break;
--index;
}
queue.add(index, frameBytes);
}
return failure;
}
}
public void flush()
{
callback.iterate();
}
public int getQueueSize()
@ -135,56 +136,55 @@ public class Flusher
}
}
private class FlusherCB extends IteratingCallback
private class FlusherCallback extends IteratingCallback
{
// TODO should active and succeeded be local?
private final List<StandardSession.FrameBytes> active = new ArrayList<>(MAX_GATHER);
private final List<StandardSession.FrameBytes> succeeded = new ArrayList<>(MAX_GATHER);
private final List<StandardSession.FrameBytes> active = new ArrayList<>(maxGather);
private final List<StandardSession.FrameBytes> succeeded = new ArrayList<>(maxGather);
private final Set<IStream> stalled = new HashSet<>();
@Override
protected Action process() throws Exception
{
synchronized (lock)
{
succeeded.clear();
// Scan queue for data to write from first non stalled stream.
int qs=queue.size();
for (int i = 0; i < qs && active.size()<MAX_GATHER;)
// Scan queue for data to write from first non stalled stream.
int index = 0; // The index of the first non-stalled frame.
int size = queue.size();
while (index < size)
{
StandardSession.FrameBytes frameBytes = queue.getUnsafe(i);
FrameBytes frameBytes = queue.getUnsafe(index);
IStream stream = frameBytes.getStream();
// Continue if this is stalled stream
if (stream!=null)
if (stream != null)
{
if (stalled.size()>0 && stalled.contains(stream))
// Is it a frame belonging to an already stalled stream ?
if (stalled.size() > 0 && stalled.contains(stream))
{
i++;
++index;
continue;
}
if (stream.getWindowSize()<=0)
// Has the stream consumed all its flow control window ?
if (stream.getWindowSize() <= 0)
{
stalled.add(stream);
i++;
++index;
continue;
}
}
// we will be writing this one, so take the frame off the queue
queue.remove(i);
qs--;
// Has the stream been reset and if this not a control frame?
if (stream != null && stream.isReset() && !(frameBytes instanceof StandardSession.ControlFrameBytes))
// We will be possibly writing this frame, so take the frame off the queue.
queue.remove(index);
--size;
// Has the stream been reset for this data frame ?
if (stream != null && stream.isReset() && frameBytes instanceof StandardSession.DataFrameBytes)
{
frameBytes.failed(new StreamException(frameBytes.getStream().getId(), StreamStatus.INVALID_STREAM,
"Stream: " + frameBytes.getStream() + " is reset!"));
frameBytes.failed(new StreamException(frameBytes.getStream().getId(),
StreamStatus.INVALID_STREAM, "Stream: " + frameBytes.getStream() + " is reset!"));
continue;
}
}
active.add(frameBytes);
}
stalled.clear();
@ -193,24 +193,24 @@ public class Flusher
LOG.debug("Flushing {} of {} frame(s) in queue", active.size(), queue.size());
}
if (active.size() == 0)
if (active.isEmpty())
return Action.IDLE;
// Get the bytes to write
ByteBuffer[] buffers = new ByteBuffer[active.size()];
for (int i=0;i<buffers.length;i++)
buffers[i]=active.get(i).getByteBuffer();
for (int i = 0; i < buffers.length; i++)
buffers[i] = active.get(i).getByteBuffer();
if (controller != null)
controller.write(flusherCB, buffers);
controller.write(this, buffers);
// TODO: optimization
// If the callback completely immediately, it means that
// the connection is not congested, and therefore we can
// write more data without blocking.
// Therefore we should check this condition and increase
// the write window, which means to things: autotune the
// MAX_GATHER parameter, and/or autotune the buffer returned
// the write window, which means two things: autotune the
// maxGather parameter, and/or autotune the buffer returned
// by FrameBytes.getByteBuffer() (see also comment there).
return Action.SCHEDULED;
@ -233,8 +233,10 @@ public class Flusher
succeeded.addAll(active);
active.clear();
}
for (FrameBytes frame: succeeded)
frame.succeeded(); // TODO should we try catch?
// Notify outside the synchronized block.
for (FrameBytes frame : succeeded)
frame.succeeded();
succeeded.clear();
super.succeeded();
}
@ -255,10 +257,10 @@ public class Flusher
failed.addAll(queue);
queue.clear();
}
for (StandardSession.FrameBytes fb : failed)
fb.failed(x); // TODO should we try catch?
// Notify outside the synchronized block.
for (StandardSession.FrameBytes frame : failed)
frame.failed(x);
super.failed(x);
}
}
}

View File

@ -470,8 +470,7 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
@Override
public void onStreamException(StreamException x)
{
// TODO: rename to onFailure
notifyOnException(listener, x); //TODO: notify StreamFrameListener if exists?
notifyOnFailure(listener, x); // TODO: notify StreamFrameListener if exists?
rst(new RstInfo(x.getStreamId(), x.getStreamStatus()), new Callback.Adapter());
}
@ -479,7 +478,7 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
public void onSessionException(SessionException x)
{
Throwable cause = x.getCause();
notifyOnException(listener, cause == null ? x : cause);
notifyOnFailure(listener, cause == null ? x : cause);
goAway(x.getSessionStatus(), 0, TimeUnit.SECONDS, new Callback.Adapter());
}
@ -721,7 +720,6 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
{
if (goAwayReceived.compareAndSet(false, true))
{
//TODO: Find a better name for GoAwayResultInfo
GoAwayResultInfo goAwayResultInfo = new GoAwayResultInfo(frame.getLastStreamId(), SessionStatus.from(frame.getStatusCode()));
notifyOnGoAway(listener, goAwayResultInfo);
// SPDY does not require to send back a response to a GO_AWAY.
@ -773,7 +771,7 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
controller.close(false);
}
private void notifyOnException(SessionFrameListener listener, Throwable x)
private void notifyOnFailure(SessionFrameListener listener, Throwable x)
{
try
{
@ -933,20 +931,24 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
// Synchronization is necessary, since we may have concurrent replies
// and those needs to be generated and enqueued atomically in order
// to maintain a correct compression context
ControlFrameBytes frameBytes;
Throwable throwable;
synchronized (this)
{
ByteBuffer buffer = generator.control(frame);
LOG.debug("Queuing {} on {}", frame, stream);
ControlFrameBytes frameBytes = new ControlFrameBytes(stream, callback, frame, buffer);
frameBytes = new ControlFrameBytes(stream, callback, frame, buffer);
if (timeout > 0)
frameBytes.task = scheduler.schedule(frameBytes, timeout, unit);
// Special handling for PING frames, they must be sent as soon as possible
if (ControlFrameType.PING == frame.getType())
flusher.prepend(frameBytes);
throwable = flusher.prepend(frameBytes);
else
flusher.append(frameBytes);
throwable = flusher.append(frameBytes);
}
// Flush MUST be done outside synchronized blocks
flush(frameBytes, throwable);
}
catch (Exception x)
{
@ -968,36 +970,47 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
DataFrameBytes frameBytes = new DataFrameBytes(stream, callback, dataInfo);
if (timeout > 0)
frameBytes.task = scheduler.schedule(frameBytes, timeout, unit);
flusher.append(frameBytes);
flush(frameBytes, flusher.append(frameBytes));
}
@Override
public void shutdown()
{
FrameBytes frameBytes = new CloseFrameBytes();
flusher.append(frameBytes);
CloseFrameBytes frameBytes = new CloseFrameBytes();
flush(frameBytes, flusher.append(frameBytes));
}
private void flush(FrameBytes frameBytes, Throwable throwable)
{
if (throwable != null)
frameBytes.failed(throwable);
else
flusher.flush();
}
private void complete(final Callback callback)
{
callback.succeeded();
try
{
if (callback != null)
callback.succeeded();
}
catch (Throwable x)
{
LOG.info("Exception while notifying callback " + callback, x);
}
}
private void notifyCallbackFailed(Callback callback, Throwable x)
private void notifyCallbackFailed(Callback callback, Throwable failure)
{
try
{
if (callback != null)
callback.failed(x);
callback.failed(failure);
}
catch (Exception xx)
catch (Throwable x)
{
LOG.info("Exception while notifying callback " + callback, xx);
}
catch (Error xx)
{
LOG.info("Exception while notifying callback " + callback, xx);
throw xx;
LOG.info("Exception while notifying callback " + callback, x);
}
}
@ -1100,7 +1113,7 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
}
}
class ControlFrameBytes extends AbstractFrameBytes
protected class ControlFrameBytes extends AbstractFrameBytes
{
private final ControlFrame frame;
private final ByteBuffer buffer;
@ -1143,7 +1156,7 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
}
}
private class DataFrameBytes extends AbstractFrameBytes
protected class DataFrameBytes extends AbstractFrameBytes
{
private final DataInfo dataInfo;
private int size;
@ -1203,7 +1216,7 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
// We have written a frame out of this DataInfo, but there is more to write.
// We need to keep the correct ordering of frames, to avoid that another
// DataInfo for the same stream is written before this one is finished.
flusher.prepend(this);
flush(this, flusher.prepend(this));
}
else
{
@ -1221,7 +1234,7 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
}
}
private class CloseFrameBytes extends AbstractFrameBytes
protected class CloseFrameBytes extends AbstractFrameBytes
{
private CloseFrameBytes()
{