Issue #2175 cleanups after review

Re implemented a queue and flusher above extension stack to serialize
frame handling without holding a lock when calling callbacks

Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
Greg Wilkins 2019-01-23 09:06:27 +11:00
parent 3d7ee93112
commit d520ae5a11
1 changed files with 34 additions and 30 deletions

View File

@ -65,7 +65,6 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
private final FrameHandler handler;
private final Negotiated negotiated;
private final boolean demanding;
private final Queue<FrameEntry> queue = new ArrayDeque<>();
private final Flusher flusher = new Flusher();
private WebSocketConnection connection;
@ -474,7 +473,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
try
{
synchronized(this)
synchronized(flusher)
{
boolean closeConnection = channelState.onOutgoingFrame(frame);
@ -496,7 +495,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
}
}
queue.offer(new FrameEntry(frame, callback, batch));
flusher.queue.offer(new FrameEntry(frame, callback, batch));
}
flusher.iterate();
}
@ -521,9 +520,9 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
@Override
public void flush(Callback callback)
{
synchronized(this)
synchronized(flusher)
{
queue.offer(new FrameEntry(FrameFlusher.FLUSH_FRAME, callback, false));
flusher.queue.offer(new FrameEntry(FrameFlusher.FLUSH_FRAME, callback, false));
}
flusher.iterate();
}
@ -762,50 +761,55 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
private class Flusher extends IteratingCallback
{
private final Queue<FrameEntry> queue = new ArrayDeque<>();
FrameEntry entry;
@Override
protected Action process() throws Throwable
{
while(true)
synchronized (this)
{
synchronized (this)
{
entry = queue.poll();
}
if (entry==null)
return Action.IDLE;
negotiated.getExtensions().sendFrame(entry.frame, this, entry.batch);
return Action.SCHEDULED;
entry = queue.poll();
}
if (entry==null)
return Action.IDLE;
negotiated.getExtensions().sendFrame(entry.frame, this, entry.batch);
return Action.SCHEDULED;
}
@Override
public void succeeded()
{
FrameEntry succeeded;
synchronized (this)
{
succeeded = entry;
entry = null;
}
if (succeeded!=null)
succeeded.callback.succeeded();
entry.callback.succeeded();
super.succeeded();
}
@Override
public void failed(Throwable x)
protected void onCompleteFailure(Throwable cause)
{
FrameEntry failed;
entry.callback.failed(cause);
Queue<FrameEntry> entries;
synchronized (this)
{
failed = entry;
entry = null;
entries = new ArrayDeque<>(queue);
queue.clear();
}
entries.forEach(e-> failEntry(cause, e));
}
private void failEntry(Throwable cause, FrameEntry e)
{
try
{
e.callback.failed(cause);
}
catch(Throwable x)
{
if (cause != x)
cause.addSuppressed(x);
LOG.warn(cause);
}
if (failed!=null)
failed.callback.failed(x);
super.failed(x);
}
}