removed extension queue #2175
Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
parent
9bd9133399
commit
ce771070f6
|
@ -51,8 +51,6 @@ public class ExtensionStack implements IncomingFrames, OutgoingFrames, Dumpable
|
|||
{
|
||||
private static final Logger LOG = Log.getLogger(ExtensionStack.class);
|
||||
|
||||
private final Queue<FrameEntry> entries = new ArrayDeque<>();
|
||||
private final IteratingCallback flusher = new Flusher();
|
||||
private final WebSocketExtensionRegistry factory;
|
||||
private List<Extension> extensions;
|
||||
private IncomingFrames incoming;
|
||||
|
@ -198,11 +196,9 @@ public class ExtensionStack implements IncomingFrames, OutgoingFrames, Dumpable
|
|||
{
|
||||
if (outgoing == null)
|
||||
throw new IllegalStateException();
|
||||
FrameEntry entry = new FrameEntry(frame, callback, batch);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Queuing {}", entry);
|
||||
offerEntry(entry);
|
||||
flusher.iterate();
|
||||
LOG.debug("Extending out {} {} {}", frame, callback, batch);
|
||||
outgoing.sendFrame(frame, callback, batch);
|
||||
}
|
||||
|
||||
public void connect(IncomingFrames incoming, OutgoingFrames outgoing, WebSocketChannel webSocketChannel)
|
||||
|
@ -224,30 +220,6 @@ public class ExtensionStack implements IncomingFrames, OutgoingFrames, Dumpable
|
|||
extension.setWebSocketChannel(webSocketChannel);
|
||||
}
|
||||
|
||||
private void offerEntry(FrameEntry entry)
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
entries.offer(entry);
|
||||
}
|
||||
}
|
||||
|
||||
private FrameEntry pollEntry()
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
return entries.poll();
|
||||
}
|
||||
}
|
||||
|
||||
private int getQueueSize()
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
return entries.size();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String dump()
|
||||
{
|
||||
|
@ -264,9 +236,7 @@ public class ExtensionStack implements IncomingFrames, OutgoingFrames, Dumpable
|
|||
public String toString()
|
||||
{
|
||||
StringBuilder s = new StringBuilder();
|
||||
s.append("ExtensionStack[");
|
||||
s.append("queueSize=").append(getQueueSize());
|
||||
s.append(",extensions=");
|
||||
s.append("ExtensionStack[extensions=");
|
||||
if (extensions == null)
|
||||
{
|
||||
s.append("<null>");
|
||||
|
@ -298,94 +268,4 @@ public class ExtensionStack implements IncomingFrames, OutgoingFrames, Dumpable
|
|||
s.append("]");
|
||||
return s.toString();
|
||||
}
|
||||
|
||||
private class Flusher extends IteratingCallback implements Callback
|
||||
{
|
||||
private FrameEntry current;
|
||||
|
||||
@Override
|
||||
protected Action process() throws Exception
|
||||
{
|
||||
current = pollEntry();
|
||||
if (current == null)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Entering IDLE");
|
||||
return Action.IDLE;
|
||||
}
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Processing {}", current);
|
||||
outgoing.sendFrame(current.frame, this, current.batch);
|
||||
return Action.SCHEDULED;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onCompleteSuccess()
|
||||
{
|
||||
// This IteratingCallback never completes.
|
||||
throw new IllegalStateException("This IteratingCallback should never complete.");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onCompleteFailure(Throwable x)
|
||||
{
|
||||
// This IteratingCallback never fails.
|
||||
// The callback are those provided by WriteCallback (implemented
|
||||
// below) and even in case of writeFailed() we call succeeded().
|
||||
throw new IllegalStateException("This IteratingCallback should never fail.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
// Notify first then call succeeded(), otherwise
|
||||
// write callbacks may be invoked out of order.
|
||||
notifyCallbackSuccess(current.callback);
|
||||
super.succeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable cause)
|
||||
{
|
||||
// Notify first, the call succeeded() to drain the queue.
|
||||
// We don't want to call failed(x) because that will put
|
||||
// this flusher into a final state that cannot be exited,
|
||||
// and the failure of a frame may not mean that the whole
|
||||
// connection is now invalid.
|
||||
notifyCallbackFailure(current.callback, cause);
|
||||
super.failed(cause);
|
||||
}
|
||||
|
||||
private void notifyCallbackSuccess(Callback callback)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (callback != null)
|
||||
callback.succeeded();
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
LOG.debug("Exception while notifying success of callback " + callback, x);
|
||||
}
|
||||
}
|
||||
|
||||
private void notifyCallbackFailure(Callback callback, Throwable failure)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (callback != null)
|
||||
callback.failed(failure);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
LOG.debug("Exception while notifying failure of callback " + callback, x);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "ExtensionStack$Flusher[" + (extensions == null?-1:extensions.size()) + "]";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -456,66 +456,72 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
|
|||
@Override
|
||||
public void sendFrame(Frame frame, Callback callback, boolean batch)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("sendFrame({}, {}, {})", frame, callback, batch);
|
||||
|
||||
try
|
||||
synchronized(this)
|
||||
{
|
||||
assertValidOutgoing(frame);
|
||||
outgoingSequence.check(frame.getOpCode(), frame.isFin());
|
||||
}
|
||||
catch (Throwable ex)
|
||||
{
|
||||
callback.failed(ex);
|
||||
return;
|
||||
}
|
||||
|
||||
if (frame.getOpCode() == OpCode.CLOSE)
|
||||
{
|
||||
CloseStatus closeStatus = CloseStatus.getCloseStatus(frame);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("close({}, {}, {})", closeStatus, callback, batch);
|
||||
LOG.debug("sendFrame({}, {}, {})", frame, callback, batch);
|
||||
|
||||
if (state.onCloseOut(closeStatus))
|
||||
try
|
||||
{
|
||||
callback = new Callback.Nested(callback)
|
||||
assertValidOutgoing(frame);
|
||||
outgoingSequence.check(frame.getOpCode(), frame.isFin());
|
||||
}
|
||||
catch (Throwable ex)
|
||||
{
|
||||
callback.failed(ex);
|
||||
return;
|
||||
}
|
||||
|
||||
if (frame.getOpCode() == OpCode.CLOSE)
|
||||
{
|
||||
CloseStatus closeStatus = CloseStatus.getCloseStatus(frame);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("close({}, {}, {})", closeStatus, callback, batch);
|
||||
|
||||
if (state.onCloseOut(closeStatus))
|
||||
{
|
||||
@Override
|
||||
public void completed()
|
||||
callback = new Callback.Nested(callback)
|
||||
{
|
||||
try
|
||||
{
|
||||
handler.onClosed(state.getCloseStatus());
|
||||
}
|
||||
catch (Throwable e)
|
||||
@Override
|
||||
public void completed()
|
||||
{
|
||||
try
|
||||
{
|
||||
handler.onError(e);
|
||||
handler.onClosed(state.getCloseStatus());
|
||||
}
|
||||
catch (Throwable e2)
|
||||
catch (Throwable e)
|
||||
{
|
||||
e.addSuppressed(e2);
|
||||
LOG.warn(e);
|
||||
try
|
||||
{
|
||||
handler.onError(e);
|
||||
}
|
||||
catch (Throwable e2)
|
||||
{
|
||||
e.addSuppressed(e2);
|
||||
LOG.warn(e);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
};
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
negotiated.getExtensions().sendFrame(frame, callback, batch);
|
||||
negotiated.getExtensions().sendFrame(frame, callback, batch);
|
||||
}
|
||||
connection.sendFrameQueue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush(Callback callback)
|
||||
{
|
||||
negotiated.getExtensions().sendFrame(FrameFlusher.FLUSH_FRAME, callback, false);
|
||||
synchronized(this)
|
||||
{
|
||||
negotiated.getExtensions().sendFrame(FrameFlusher.FLUSH_FRAME, callback, false);
|
||||
}
|
||||
connection.sendFrameQueue();
|
||||
}
|
||||
|
||||
|
@ -598,7 +604,8 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
|
|||
maxTextMessageSize = maxSize;
|
||||
}
|
||||
|
||||
private class IncomingAdaptor extends FrameSequence implements IncomingFrames
|
||||
private class
|
||||
IncomingAdaptor extends FrameSequence implements IncomingFrames
|
||||
{
|
||||
@Override
|
||||
public void onFrame(Frame frame, Callback callback)
|
||||
|
|
Loading…
Reference in New Issue