externalized flusher iteration #2175

Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
Greg Wilkins 2019-01-10 16:05:29 +11:00
parent 4165c4507b
commit 9bd9133399
3 changed files with 28 additions and 61 deletions

View File

@ -49,8 +49,6 @@ public class FrameFlusher extends IteratingCallback
private final Deque<Entry> queue = new ArrayDeque<>();
private final List<Entry> entries;
private final List<ByteBuffer> buffers;
private boolean closed;
private Throwable terminated;
private ByteBuffer batchBuffer = null;
public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endPoint, int bufferSize, int maxGather)
@ -67,25 +65,14 @@ public class FrameFlusher extends IteratingCallback
public void enqueue(Frame frame, Callback callback, boolean batch)
{
Entry entry = new Entry(frame, callback, batch);
Throwable closed;
byte opCode = frame.getOpCode();
synchronized (this)
{
closed = terminated;
if (closed == null)
{
byte opCode = frame.getOpCode();
if (opCode == OpCode.PING || opCode == OpCode.PONG)
queue.offerFirst(entry);
else
queue.offerLast(entry);
}
if (opCode == OpCode.PING || opCode == OpCode.PONG)
queue.offerFirst(entry);
else
queue.offerLast(entry);
}
if (closed == null)
iterate();
else
notifyCallbackFailure(callback, closed);
}
@Override
@ -102,12 +89,6 @@ public class FrameFlusher extends IteratingCallback
if (succeedEntries() && batchBuffer != null)
BufferUtil.clear(batchBuffer);
if (closed)
return Action.SUCCEEDED;
if (terminated != null)
throw terminated;
while (!queue.isEmpty() && entries.size() <= maxGather)
{
Entry entry = queue.poll();
@ -220,10 +201,7 @@ public class FrameFlusher extends IteratingCallback
notifyCallbackSuccess(entry.callback);
entry.release();
if (entry.frame.getOpCode() == OpCode.CLOSE)
{
terminate(new ClosedChannelException(), true);
endPoint.shutdownOutput();
}
}
entries.clear();
return hadEntries;
@ -233,13 +211,8 @@ public class FrameFlusher extends IteratingCallback
public void onCompleteFailure(Throwable failure)
{
releaseAggregate();
Throwable closed;
synchronized (this)
{
closed = terminated;
if (closed == null)
terminated = failure;
entries.addAll(queue);
queue.clear();
}
@ -261,22 +234,6 @@ public class FrameFlusher extends IteratingCallback
}
}
public void terminate(Throwable cause, boolean close)
{
Throwable reason;
synchronized (this)
{
closed = close;
reason = terminated;
if (reason == null)
terminated = cause;
}
if (LOG.isDebugEnabled())
LOG.debug("{} {}", reason == null?"Terminating":"Terminated", this);
if (reason == null && !close)
iterate();
}
protected void notifyCallbackSuccess(Callback callback)
{
try
@ -312,12 +269,11 @@ public class FrameFlusher extends IteratingCallback
@Override
public String toString()
{
return String.format("%s@%x[queueSize=%d,aggregate=%s,terminated=%s]",
return String.format("%s@%x[queueSize=%d,aggregate=%s]",
getClass().getSimpleName(),
hashCode(),
getQueueSize(),
BufferUtil.toDetailString(batchBuffer),
terminated);
BufferUtil.toDetailString(batchBuffer));
}
private class Entry extends FrameEntry
@ -353,7 +309,7 @@ public class FrameFlusher extends IteratingCallback
@Override
public String toString()
{
return String.format("%s[%s,%s,%b,%s]", getClass().getSimpleName(), frame, callback, batch, terminated);
return String.format("%s{%s,%s,%b}", getClass().getSimpleName(), frame, callback, batch);
}
}
}

View File

@ -81,7 +81,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
this.behavior = behavior;
this.negotiated = negotiated;
this.demanding = handler.isDemanding();
negotiated.getExtensions().connect(new ExtendedIncoming(), new ExtendedOutgoing(), this);
negotiated.getExtensions().connect(new IncomingAdaptor(), new OutgoingAdaptor(), this);
}
/**
@ -509,12 +509,14 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
}
negotiated.getExtensions().sendFrame(frame, callback, batch);
connection.sendFrameQueue();
}
@Override
public void flush(Callback callback)
{
negotiated.getExtensions().sendFrame(FrameFlusher.FLUSH_FRAME, callback, false);
connection.sendFrameQueue();
}
@Override
@ -596,7 +598,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
maxTextMessageSize = maxSize;
}
private class ExtendedIncoming extends FrameSequence implements IncomingFrames
private class IncomingAdaptor extends FrameSequence implements IncomingFrames
{
@Override
public void onFrame(Frame frame, Callback callback)
@ -666,14 +668,14 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
}
}
private class ExtendedOutgoing implements OutgoingFrames
private class OutgoingAdaptor implements OutgoingFrames
{
@Override
public void sendFrame(Frame frame, Callback callback, boolean batch)
{
try
{
connection.sendFrame(frame, callback, batch);
connection.enqueueFrame(frame, callback, batch);
}
catch (ProtocolException e)
{

View File

@ -31,7 +31,6 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.Behavior;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.MessageTooLargeException;
import org.eclipse.jetty.websocket.core.OutgoingFrames;
import org.eclipse.jetty.websocket.core.ProtocolException;
import org.eclipse.jetty.websocket.core.WebSocketTimeoutException;
@ -45,7 +44,7 @@ import java.util.concurrent.Executor;
/**
* Provides the implementation of {@link org.eclipse.jetty.io.Connection} that is suitable for WebSocket
*/
public class WebSocketConnection extends AbstractConnection implements Connection.UpgradeTo, Dumpable, OutgoingFrames, Runnable
public class WebSocketConnection extends AbstractConnection implements Connection.UpgradeTo, Dumpable, Runnable
{
private final Logger LOG = Log.getLogger(this.getClass());
@ -170,7 +169,6 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
// TODO review all close paths
IOException e = new IOException("Closed");
flusher.terminate(e, true);
channel.onClosed(e);
super.onClose();
}
@ -577,8 +575,14 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
setInitialBuffer(prefilled);
}
@Override
public void sendFrame(Frame frame, Callback callback, boolean batch)
/**
* Enqueue a Frame to be sent.
* @see #sendFrameQueue()
* @param frame The frame to queue
* @param callback The callback to call once the frame is sent
* @param batch True if batch mode is to be used
*/
void enqueueFrame(Frame frame, Callback callback, boolean batch)
{
if (channel.getBehavior() == Behavior.CLIENT)
{
@ -590,6 +594,11 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
flusher.enqueue(frame, callback, batch);
}
void sendFrameQueue()
{
flusher.iterate();
}
private class Flusher extends FrameFlusher
{
private Flusher(int bufferSize, Generator generator, EndPoint endpoint)