Merge remote-tracking branch 'eclipse/jetty-10.0.x-3167-2175-websocket-close' into jetty-10.0.x-3167-2175-websocket-close

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2019-01-10 21:25:15 +11:00
commit 1e4ac07511
3 changed files with 69 additions and 181 deletions

View File

@ -45,7 +45,7 @@ import org.eclipse.jetty.websocket.core.internal.Generator;
public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseable
{
private final LocalServer server;
private final RawWebSocketClient rawClient;
private final WebSocketCoreClient client;
private final RawUpgradeRequest upgradeRequest;
private final UnitGenerator generator;
private final FrameCapture frameCapture;
@ -65,9 +65,9 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
{
super();
this.server = server;
this.rawClient = new RawWebSocketClient();
this.client = new WebSocketCoreClient();
CompletableFuture<FrameCapture> futureOnCapture = new CompletableFuture<>();
this.upgradeRequest = new RawUpgradeRequest(rawClient, wsURI, futureOnCapture);
this.upgradeRequest = new RawUpgradeRequest(client, wsURI, futureOnCapture);
if (requestHeaders != null)
{
HttpFields fields = this.upgradeRequest.getHeaders();
@ -77,10 +77,10 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
fields.put(name, value);
});
}
this.rawClient.start();
this.client.start();
this.generator = new UnitGenerator(Behavior.CLIENT);
CompletableFuture<FrameHandler.CoreSession> futureHandler = this.rawClient.connect(upgradeRequest);
CompletableFuture<FrameHandler.CoreSession> futureHandler = this.client.connect(upgradeRequest);
CompletableFuture<FrameCapture> futureCapture = futureHandler.thenCombine(futureOnCapture, (channel, capture) -> capture);
this.frameCapture = futureCapture.get(10, TimeUnit.SECONDS);
}
@ -102,7 +102,7 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
@Override
public void close() throws Exception
{
this.rawClient.stop();
this.client.stop();
}
@Override
@ -156,7 +156,7 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
{
try (SharedBlockingCallback.Blocker blocker = sharedBlockingCallback.acquire())
{
frameCapture.channel.sendFrame(f, blocker, false);
frameCapture.session.sendFrame(f, blocker, false);
}
}
}
@ -168,7 +168,7 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
{
try (SharedBlockingCallback.Blocker blocker = sharedBlockingCallback.acquire())
{
frameCapture.channel.sendFrame(f, blocker, false);
frameCapture.session.sendFrame(f, blocker, false);
}
}
}
@ -184,10 +184,6 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
}
}
public static class RawWebSocketClient extends WebSocketCoreClient
{
}
public static class RawUpgradeRequest extends UpgradeRequest
{
private final CompletableFuture<FrameCapture> futureCapture;
@ -226,7 +222,7 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
private final BlockingQueue<Frame> receivedFrames = new LinkedBlockingQueue<>();
private final EndPoint endPoint;
private final SharedBlockingCallback blockingCallback = new SharedBlockingCallback();
private CoreSession channel;
private CoreSession session;
public FrameCapture(EndPoint endPoint)
{
@ -247,20 +243,26 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
public void onFrame(Frame frame, Callback callback)
{
receivedFrames.offer(Frame.copy(frame));
callback.succeeded();
synchronized(this)
{
callback.succeeded();
}
}
@Override
public void onOpen(CoreSession coreSession) throws Exception
{
this.channel = coreSession;
this.session = coreSession;
}
public void writeRaw(ByteBuffer buffer) throws IOException
{
try (SharedBlockingCallback.Blocker blocker = blockingCallback.acquire())
synchronized (this)
{
this.endPoint.write(blocker, buffer);
try (SharedBlockingCallback.Blocker blocker = blockingCallback.acquire())
{
this.endPoint.write(blocker, buffer);
}
}
}
}

View File

@ -51,8 +51,6 @@ public class ExtensionStack implements IncomingFrames, OutgoingFrames, Dumpable
{
private static final Logger LOG = Log.getLogger(ExtensionStack.class);
private final Queue<FrameEntry> entries = new ArrayDeque<>();
private final IteratingCallback flusher = new Flusher();
private final WebSocketExtensionRegistry factory;
private List<Extension> extensions;
private IncomingFrames incoming;
@ -198,11 +196,9 @@ public class ExtensionStack implements IncomingFrames, OutgoingFrames, Dumpable
{
if (outgoing == null)
throw new IllegalStateException();
FrameEntry entry = new FrameEntry(frame, callback, batch);
if (LOG.isDebugEnabled())
LOG.debug("Queuing {}", entry);
offerEntry(entry);
flusher.iterate();
LOG.debug("Extending out {} {} {}", frame, callback, batch);
outgoing.sendFrame(frame, callback, batch);
}
public void connect(IncomingFrames incoming, OutgoingFrames outgoing, WebSocketChannel webSocketChannel)
@ -224,30 +220,6 @@ public class ExtensionStack implements IncomingFrames, OutgoingFrames, Dumpable
extension.setWebSocketChannel(webSocketChannel);
}
private void offerEntry(FrameEntry entry)
{
synchronized (this)
{
entries.offer(entry);
}
}
private FrameEntry pollEntry()
{
synchronized (this)
{
return entries.poll();
}
}
private int getQueueSize()
{
synchronized (this)
{
return entries.size();
}
}
@Override
public String dump()
{
@ -264,9 +236,7 @@ public class ExtensionStack implements IncomingFrames, OutgoingFrames, Dumpable
public String toString()
{
StringBuilder s = new StringBuilder();
s.append("ExtensionStack[");
s.append("queueSize=").append(getQueueSize());
s.append(",extensions=");
s.append("ExtensionStack[extensions=");
if (extensions == null)
{
s.append("<null>");
@ -298,94 +268,4 @@ public class ExtensionStack implements IncomingFrames, OutgoingFrames, Dumpable
s.append("]");
return s.toString();
}
private class Flusher extends IteratingCallback implements Callback
{
private FrameEntry current;
@Override
protected Action process() throws Exception
{
current = pollEntry();
if (current == null)
{
if (LOG.isDebugEnabled())
LOG.debug("Entering IDLE");
return Action.IDLE;
}
if (LOG.isDebugEnabled())
LOG.debug("Processing {}", current);
outgoing.sendFrame(current.frame, this, current.batch);
return Action.SCHEDULED;
}
@Override
protected void onCompleteSuccess()
{
// This IteratingCallback never completes.
throw new IllegalStateException("This IteratingCallback should never complete.");
}
@Override
protected void onCompleteFailure(Throwable x)
{
// This IteratingCallback never fails.
// The callback are those provided by WriteCallback (implemented
// below) and even in case of writeFailed() we call succeeded().
throw new IllegalStateException("This IteratingCallback should never fail.");
}
@Override
public void succeeded()
{
// Notify first then call succeeded(), otherwise
// write callbacks may be invoked out of order.
notifyCallbackSuccess(current.callback);
super.succeeded();
}
@Override
public void failed(Throwable cause)
{
// Notify first, the call succeeded() to drain the queue.
// We don't want to call failed(x) because that will put
// this flusher into a final state that cannot be exited,
// and the failure of a frame may not mean that the whole
// connection is now invalid.
notifyCallbackFailure(current.callback, cause);
super.succeeded();
}
private void notifyCallbackSuccess(Callback callback)
{
try
{
if (callback != null)
callback.succeeded();
}
catch (Throwable x)
{
LOG.debug("Exception while notifying success of callback " + callback, x);
}
}
private void notifyCallbackFailure(Callback callback, Throwable failure)
{
try
{
if (callback != null)
callback.failed(failure);
}
catch (Throwable x)
{
LOG.debug("Exception while notifying failure of callback " + callback, x);
}
}
@Override
public String toString()
{
return "ExtensionStack$Flusher[" + (extensions == null?-1:extensions.size()) + "]";
}
}
}

View File

@ -454,67 +454,73 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
@Override
public void sendFrame(Frame frame, Callback callback, boolean batch)
{
if (LOG.isDebugEnabled())
LOG.debug("sendFrame({}, {}, {})", frame, callback, batch);
synchronized(this)
{
if (LOG.isDebugEnabled())
LOG.debug("sendFrame({}, {}, {})", frame, callback, batch);
boolean closed;
try
{
assertValidOutgoing(frame);
closed = channelState.checkOutgoing(frame);
}
catch (Throwable ex)
{
callback.failed(ex);
return;
}
if (frame.getOpCode() == OpCode.CLOSE)
{
CloseStatus closeStatus = CloseStatus.getCloseStatus(frame);
if (LOG.isDebugEnabled())
LOG.debug("close({}, {}, {})", closeStatus, callback, batch);
if (closed)
try
{
callback = new Callback.Nested(callback)
assertValidOutgoing(frame);
closed = channelState.checkOutgoing(frame);
}
catch (Throwable ex)
{
callback.failed(ex);
return;
}
if (frame.getOpCode() == OpCode.CLOSE)
{
CloseStatus closeStatus = CloseStatus.getCloseStatus(frame);
if (LOG.isDebugEnabled())
LOG.debug("close({}, {}, {})", closeStatus, callback, batch);
if (closed)
{
@Override
public void completed()
callback = new Callback.Nested(callback)
{
try
{
handler.onClosed(channelState.getCloseStatus());
}
catch (Throwable e)
@Override
public void completed()
{
try
{
handler.onError(e);
handler.onClosed(channelState.getCloseStatus());
}
catch (Throwable e2)
catch (Throwable e)
{
e.addSuppressed(e2);
LOG.warn(e);
try
{
handler.onError(e);
}
catch (Throwable e2)
{
e.addSuppressed(e2);
LOG.warn(e);
}
}
finally
{
connection.close();
}
}
finally
{
connection.close();
}
}
};
};
}
}
}
negotiated.getExtensions().sendFrame(frame, callback, batch);
negotiated.getExtensions().sendFrame(frame, callback, batch);
}
connection.sendFrameQueue();
}
@Override
public void flush(Callback callback)
{
negotiated.getExtensions().sendFrame(FrameFlusher.FLUSH_FRAME, callback, false);
synchronized(this)
{
negotiated.getExtensions().sendFrame(FrameFlusher.FLUSH_FRAME, callback, false);
}
connection.sendFrameQueue();
}