diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java
index 5d319ede257..b036a75abb8 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java
@@ -243,6 +243,33 @@ public interface Callback extends Invocable
};
}
+ /**
+ * Create a callback which combines two other callbacks and will succeed or fail them both.
+ * @param callback1 The first callback
+ * @param callback2 The second callback
+ * @return a new callback.
+ */
+ static Callback from(Callback callback1, Callback callback2)
+ {
+ return new Callback()
+ {
+ @Override
+ public void succeeded()
+ {
+ callback1.succeeded();
+ callback2.succeeded();
+ }
+
+ @Override
+ public void failed(Throwable x)
+ {
+ callback1.failed(x);
+ callback2.failed(x);
+ }
+ };
+ }
+
+
class Completing implements Callback
{
@Override
diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/EchoFrameHandler.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/EchoFrameHandler.java
new file mode 100644
index 00000000000..af66bb01a67
--- /dev/null
+++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/EchoFrameHandler.java
@@ -0,0 +1,56 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.websocket.core;
+
+import org.eclipse.jetty.util.Callback;
+
+public class EchoFrameHandler extends TestAsyncFrameHandler
+{
+ private boolean throwOnFrame;
+
+ public void throwOnFrame()
+ {
+ throwOnFrame = true;
+ }
+
+ public EchoFrameHandler(String name)
+ {
+ super(name);
+ }
+
+ @Override
+ public void onFrame(Frame frame, Callback callback)
+ {
+ LOG.info("[{}] onFrame {}", name, frame);
+ receivedFrames.offer(Frame.copy(frame));
+
+ if (throwOnFrame)
+ throw new RuntimeException("intentionally throwing in server onFrame()");
+
+ if (frame.isDataFrame())
+ {
+ LOG.info("[{}] echoDataFrame {}", name, frame);
+ coreSession.sendFrame(new Frame(frame.getOpCode(), frame.getPayload()), callback, false);
+ }
+ else
+ {
+ callback.succeeded();
+ }
+ }
+}
diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/TestAsyncFrameHandler.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/TestAsyncFrameHandler.java
new file mode 100644
index 00000000000..faac67ec21b
--- /dev/null
+++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/TestAsyncFrameHandler.java
@@ -0,0 +1,107 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.websocket.core;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+
+import org.eclipse.jetty.util.BlockingArrayQueue;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
+
+public class TestAsyncFrameHandler implements FrameHandler
+{
+ protected static final Logger LOG = Log.getLogger(TestAsyncFrameHandler.class);
+ protected final String name;
+
+ public CoreSession coreSession;
+ public BlockingQueue receivedFrames = new BlockingArrayQueue<>();
+ public volatile Throwable error;
+ public CountDownLatch openLatch = new CountDownLatch(1);
+ public CountDownLatch errorLatch = new CountDownLatch(1);
+ public CountDownLatch closeLatch = new CountDownLatch(1);
+
+ public TestAsyncFrameHandler()
+ {
+ name = TestAsyncFrameHandler.class.getSimpleName();
+ }
+
+ public TestAsyncFrameHandler(String name)
+ {
+ this.name = name;
+ }
+
+ @Override
+ public void onOpen(CoreSession coreSession, Callback callback)
+ {
+ LOG.info("[{}] onOpen {}", name, coreSession);
+ this.coreSession = coreSession;
+ callback.succeeded();
+ openLatch.countDown();
+ }
+
+ @Override
+ public void onFrame(Frame frame, Callback callback)
+ {
+ LOG.info("[{}] onFrame {}", name, frame);
+ receivedFrames.offer(Frame.copy(frame));
+ callback.succeeded();
+ }
+
+ @Override
+ public void onClosed(CloseStatus closeStatus, Callback callback)
+ {
+ LOG.info("[{}] onClosed {}", name, closeStatus);
+ closeLatch.countDown();
+ callback.succeeded();
+ }
+
+ @Override
+ public void onError(Throwable cause, Callback callback)
+ {
+ LOG.info("[{}] onError {} ", name, cause == null?null:cause.toString());
+ error = cause;
+ errorLatch.countDown();
+ callback.succeeded();
+ }
+
+ public void sendText(String text)
+ {
+ LOG.info("[{}] sendText {} ", name, text);
+ Frame frame = new Frame(OpCode.TEXT, text);
+ coreSession.sendFrame(frame, Callback.NOOP, false);
+ }
+
+ public void sendFrame(Frame frame)
+ {
+ LOG.info("[{}] sendFrame {} ", name, frame);
+ coreSession.sendFrame(frame, Callback.NOOP, false);
+ }
+
+ public void close()
+ {
+ close(CloseStatus.NORMAL, null);
+ }
+
+ public void close(int closeStatus, String reason)
+ {
+ sendFrame(CloseStatus.toFrame(closeStatus, reason));
+ }
+}
diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/WebSocketProxy.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/WebSocketProxy.java
new file mode 100644
index 00000000000..11d16d8bc4e
--- /dev/null
+++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/WebSocketProxy.java
@@ -0,0 +1,653 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.websocket.core.proxy;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+
+import org.eclipse.jetty.util.BlockingArrayQueue;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.websocket.core.CloseStatus;
+import org.eclipse.jetty.websocket.core.Frame;
+import org.eclipse.jetty.websocket.core.FrameHandler;
+import org.eclipse.jetty.websocket.core.OpCode;
+import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
+
+class WebSocketProxy
+{
+ protected static final Logger LOG = Log.getLogger(WebSocketProxy.class);
+
+ enum State
+ {
+ NOT_OPEN,
+ CONNECTING,
+ OPEN,
+ ISHUT,
+ OSHUT,
+ CLOSED,
+ FAILED
+ }
+
+ private final Object lock = new Object();
+ private WebSocketCoreClient client;
+ private URI serverUri;
+
+ public Client2Proxy client2Proxy = new Client2Proxy();
+ public Server2Proxy server2Proxy = new Server2Proxy();
+
+ public WebSocketProxy(WebSocketCoreClient client, URI serverUri)
+ {
+ this.client = client;
+ this.serverUri = serverUri;
+ }
+
+ class Client2Proxy implements FrameHandler
+ {
+ private CoreSession client;
+ private State state = State.NOT_OPEN;
+
+ private Callback closeCallback;
+ private Throwable error;
+
+ public BlockingQueue receivedFrames = new BlockingArrayQueue<>();
+ protected CountDownLatch closed = new CountDownLatch(1);
+
+ public State getState()
+ {
+ synchronized (this)
+ {
+ return state;
+ }
+ }
+
+ @Override
+ public void onOpen(CoreSession coreSession, Callback callback)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("[{}] onOpen {}", toString(), coreSession);
+
+ Throwable failure = null;
+ synchronized (lock)
+ {
+ switch (state)
+ {
+ case NOT_OPEN:
+ state = State.CONNECTING;
+ client = coreSession;
+ break;
+
+ default:
+ failure = new IllegalStateException();
+ break;
+ }
+ }
+
+ if (failure != null)
+ callback.failed(failure);
+ else
+ server2Proxy.connect(Callback.from(() -> onOpenSuccess(callback), (t) -> onOpenFail(callback, t)));
+ }
+
+ private void onOpenSuccess(Callback callback)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("[{}] onOpenSuccess", toString());
+
+ Throwable failure = null;
+ synchronized (lock)
+ {
+ switch (state)
+ {
+ case CONNECTING:
+ state = State.OPEN;
+ break;
+
+ case FAILED:
+ failure = error;
+ break;
+
+ default:
+ failure = new IllegalStateException();
+ break;
+ }
+ }
+
+ if (failure != null)
+ server2Proxy.fail(failure, callback);
+ else
+ callback.succeeded();
+ }
+
+ private void onOpenFail(Callback callback, Throwable t)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("[{}] onOpenFail {}", toString(), t);
+
+ Throwable failure = t;
+ synchronized (lock)
+ {
+ switch (state)
+ {
+ case CONNECTING:
+ state = State.FAILED;
+ error = t;
+ break;
+
+ case FAILED:
+ failure = error;
+ failure.addSuppressed(t);
+ break;
+
+ default:
+ failure = new IllegalStateException();
+ break;
+ }
+ }
+
+ callback.failed(failure);
+ }
+
+ @Override
+ public void onFrame(Frame frame, Callback callback)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("[{}] onFrame {}", toString(), frame);
+ receivedFrames.offer(Frame.copy(frame));
+
+ Callback sendCallback = callback;
+ Throwable failure = null;
+ synchronized (lock)
+ {
+ switch (state)
+ {
+ case OPEN:
+ if (frame.getOpCode() == OpCode.CLOSE)
+ {
+ state = State.ISHUT;
+ // the callback is saved until a close response comes in sendFrame from Server2Proxy
+ // if the callback was completed here then core would send its own close response
+ closeCallback = callback;
+ sendCallback = Callback.from(()->{}, callback::failed);
+ }
+ break;
+
+ case OSHUT:
+ if (frame.getOpCode() == OpCode.CLOSE)
+ state = State.CLOSED;
+ break;
+
+ case FAILED:
+ failure = error;
+ break;
+
+ default:
+ failure = new IllegalStateException();
+ break;
+ }
+ }
+
+ if (failure != null)
+ callback.failed(failure);
+ else
+ server2Proxy.send(frame, sendCallback);
+ }
+
+ @Override
+ public void onError(Throwable failure, Callback callback)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("[{}] onError {}", toString(), failure);
+
+ boolean failServer2Proxy;
+ synchronized (lock)
+ {
+ switch (state)
+ {
+ case FAILED:
+ case CLOSED:
+ failServer2Proxy = false;
+ break;
+
+ default:
+ state = State.FAILED;
+ error = failure;
+ failServer2Proxy = true;
+ break;
+ }
+ }
+
+ if (failServer2Proxy)
+ server2Proxy.fail(failure,callback);
+ else
+ callback.failed(failure);
+ }
+
+ public void fail(Throwable failure, Callback callback)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("[{}] fail {}", toString(), failure);
+
+ Callback sendCallback = null;
+ synchronized (lock)
+ {
+ switch (state)
+ {
+ case OPEN:
+ state = State.FAILED;
+ sendCallback = Callback.from(callback, failure);
+ break;
+
+ case ISHUT:
+ state = State.FAILED;
+ Callback doubleCallback = Callback.from(callback, closeCallback);
+ sendCallback = Callback.from(doubleCallback, failure);
+ break;
+
+ default:
+ state = State.FAILED;
+ break;
+ }
+ }
+
+ if (sendCallback != null)
+ client.close(CloseStatus.SHUTDOWN, failure.getMessage(), sendCallback);
+ else
+ callback.failed(failure);
+ }
+
+ @Override
+ public void onClosed(CloseStatus closeStatus, Callback callback)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("[{}] onClosed {}", toString(), closeStatus);
+
+ closed.countDown();
+ callback.succeeded();
+ }
+
+ public void send(Frame frame, Callback callback)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("[{}] send {}", toString(), frame);
+
+ Callback sendCallback = callback;
+ Throwable failure = null;
+ synchronized (lock)
+ {
+ switch (state)
+ {
+ case OPEN:
+ if (frame.getOpCode() == OpCode.CLOSE)
+ state = State.OSHUT;
+ break;
+
+ case ISHUT:
+ if (frame.getOpCode() == OpCode.CLOSE)
+ {
+ state = State.CLOSED;
+ sendCallback = Callback.from(callback, closeCallback);
+ }
+ break;
+
+ case FAILED:
+ failure = error;
+ break;
+
+ default:
+ failure = new IllegalStateException();
+ break;
+ }
+ }
+
+ if (failure != null)
+ callback.failed(failure);
+ else
+ client.sendFrame(frame, sendCallback, false);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Client2Proxy:" + getState();
+ }
+ }
+
+ class Server2Proxy implements FrameHandler
+ {
+ private CoreSession server;
+ private State state = State.NOT_OPEN;
+
+ private Callback closeCallback;
+ private Throwable error;
+
+ public BlockingQueue receivedFrames = new BlockingArrayQueue<>();
+ protected CountDownLatch closed = new CountDownLatch(1);
+
+ public State getState()
+ {
+ synchronized (this)
+ {
+ return state;
+ }
+ }
+
+ public void connect(Callback callback)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("[{}] connect", toString());
+
+ Throwable failure = null;
+ synchronized (lock)
+ {
+ switch (state)
+ {
+ case NOT_OPEN:
+ try
+ {
+ state = State.CONNECTING;
+ client.connect(this, serverUri).whenComplete((s,t)->
+ {
+ if (t != null)
+ onConnectFailure(t, callback);
+ else
+ onConnectSuccess(s, callback);
+ });
+ }
+ catch (IOException e)
+ {
+ state = State.FAILED;
+ error = e;
+ failure = e;
+ }
+ break;
+
+ case FAILED:
+ failure = error;
+ break;
+
+ default:
+ state = State.FAILED;
+ error = new IllegalStateException();
+ failure = error;
+ break;
+ }
+ }
+
+ if (failure != null)
+ callback.failed(failure);
+ }
+
+ private void onConnectSuccess(CoreSession coreSession, Callback callback)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("[{}] onConnectSuccess {}", toString(), coreSession);
+
+ Throwable failure = null;
+ synchronized (lock)
+ {
+ switch (state)
+ {
+ case OPEN:
+ break;
+
+ case FAILED:
+ failure = error;
+ break;
+
+ default:
+ state = State.FAILED;
+ error = new IllegalStateException();
+ failure = error;
+ break;
+ }
+ }
+
+ if (failure != null)
+ coreSession.close(CloseStatus.SHUTDOWN, failure.getMessage(), Callback.from(callback, failure));
+ else
+ callback.succeeded();
+ }
+
+ private void onConnectFailure(Throwable t, Callback callback)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("[{}] onConnectFailure {}", toString(), t);
+
+ Throwable failure = t;
+ synchronized (lock)
+ {
+ switch (state)
+ {
+ case CONNECTING:
+ state = State.FAILED;
+ error = t;
+ break;
+
+ case FAILED:
+ failure = error;
+ break;
+
+ default:
+ state = State.FAILED;
+ error = new IllegalStateException();
+ failure = error;
+ break;
+ }
+ }
+
+ callback.failed(failure);
+ }
+
+ @Override
+ public void onOpen(CoreSession coreSession, Callback callback)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("[{}] onOpen {}", toString(), coreSession);
+
+ Throwable failure = null;
+ synchronized (lock)
+ {
+ switch (state)
+ {
+ case CONNECTING:
+ state = State.OPEN;
+ server = coreSession;
+ break;
+
+ case FAILED:
+ failure = error;
+ break;
+
+ default:
+ failure = new IllegalStateException();
+ break;
+ }
+ }
+
+ if (failure != null)
+ callback.failed(failure);
+ else
+ callback.succeeded();
+ }
+
+ @Override
+ public void onFrame(Frame frame, Callback callback)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("[{}] onFrame {}", toString(), frame);
+ receivedFrames.offer(Frame.copy(frame));
+
+ Callback sendCallback = callback;
+ Throwable failure = null;
+ synchronized (lock)
+ {
+ switch (state)
+ {
+ case OPEN:
+ if (frame.getOpCode() == OpCode.CLOSE)
+ {
+ state = State.ISHUT;
+ closeCallback = callback;
+ sendCallback = Callback.from(()->{}, callback::failed);
+ }
+ break;
+
+ case OSHUT:
+ if (frame.getOpCode() == OpCode.CLOSE)
+ state = State.CLOSED;
+ break;
+
+ case FAILED:
+ failure = error;
+ break;
+
+ default:
+ failure = new IllegalStateException();
+ break;
+ }
+ }
+
+ if (failure != null)
+ callback.failed(failure);
+ else
+ client2Proxy.send(frame, sendCallback);
+
+ }
+
+ @Override
+ public void onError(Throwable failure, Callback callback)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("[{}] onError {}", toString(), failure);
+
+ boolean failClient2Proxy = false;
+ synchronized (lock)
+ {
+ switch (state)
+ {
+ case FAILED:
+ case CLOSED:
+ break;
+
+ default:
+ state = State.FAILED;
+ error = failure;
+ failClient2Proxy = true;
+ break;
+ }
+ }
+
+ if (failClient2Proxy)
+ client2Proxy.fail(failure,callback);
+ else
+ callback.failed(failure);
+ }
+
+ @Override
+ public void onClosed(CloseStatus closeStatus, Callback callback)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("[{}] onClosed {}", toString(), closeStatus);
+ closed.countDown();
+ callback.succeeded();
+ }
+
+ public void fail(Throwable failure, Callback callback)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("[{}] fail {}", toString(), failure);
+
+ Callback sendCallback = null;
+ synchronized (lock)
+ {
+ switch (state)
+ {
+ case OPEN:
+ state = State.FAILED;
+ sendCallback = Callback.from(callback, failure);
+ break;
+
+ case ISHUT:
+ state = State.FAILED;
+ Callback doubleCallback = Callback.from(callback, closeCallback);
+ sendCallback = Callback.from(doubleCallback, failure);
+
+ default:
+ state = State.FAILED;
+ break;
+ }
+ }
+
+ if (sendCallback != null)
+ server.close(CloseStatus.SHUTDOWN, failure.getMessage(), sendCallback);
+ else
+ callback.failed(failure);
+ }
+
+ public void send(Frame frame, Callback callback)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("[{}] send {}", toString(), frame);
+
+ Callback sendCallback = callback;
+ Throwable failure = null;
+ synchronized (lock)
+ {
+ switch (state)
+ {
+ case OPEN:
+ if (frame.getOpCode() == OpCode.CLOSE)
+ state = State.OSHUT;
+ break;
+
+ case ISHUT:
+ if (frame.getOpCode() == OpCode.CLOSE)
+ {
+ state = State.CLOSED;
+ sendCallback = Callback.from(callback, closeCallback);
+ }
+ break;
+
+ case FAILED:
+ failure = error;
+ break;
+
+ default:
+ failure = new IllegalStateException();
+ break;
+ }
+ }
+
+ if (failure != null)
+ callback.failed(failure);
+ else
+ server.sendFrame(frame, sendCallback, false);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Server2Proxy:" + getState();
+ }
+ }
+}
\ No newline at end of file
diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/WebSocketProxyTest.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/WebSocketProxyTest.java
new file mode 100644
index 00000000000..46df9a2301a
--- /dev/null
+++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/WebSocketProxyTest.java
@@ -0,0 +1,388 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.websocket.core.proxy;
+
+import java.io.IOException;
+import java.net.URI;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.eclipse.jetty.http.HttpStatus;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.eclipse.jetty.server.handler.ContextHandler;
+import org.eclipse.jetty.server.handler.HandlerList;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.log.StacklessLogging;
+import org.eclipse.jetty.websocket.core.CloseStatus;
+import org.eclipse.jetty.websocket.core.EchoFrameHandler;
+import org.eclipse.jetty.websocket.core.Frame;
+import org.eclipse.jetty.websocket.core.FrameHandler;
+import org.eclipse.jetty.websocket.core.FrameHandler.CoreSession;
+import org.eclipse.jetty.websocket.core.OpCode;
+import org.eclipse.jetty.websocket.core.TestAsyncFrameHandler;
+import org.eclipse.jetty.websocket.core.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
+import org.eclipse.jetty.websocket.core.internal.WebSocketCoreSession;
+import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
+import org.eclipse.jetty.websocket.core.server.WebSocketUpgradeHandler;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class WebSocketProxyTest
+{
+ private Server _server;
+ private WebSocketCoreClient _client;
+ private WebSocketProxy proxy;
+ private EchoFrameHandler serverFrameHandler;
+ private TestHandler testHandler;
+ FrameHandler.ConfigurationCustomizer defaultCustomizer;
+
+ private class TestHandler extends AbstractHandler
+ {
+ public void blockServerUpgradeRequests()
+ {
+ blockServerUpgradeRequests = true;
+ }
+
+ public boolean blockServerUpgradeRequests = false;
+
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
+ {
+ if (baseRequest.getHeader("Upgrade") != null)
+ {
+ if (blockServerUpgradeRequests && target.startsWith("/server/"))
+ {
+ response.sendError(HttpStatus.INTERNAL_SERVER_ERROR_500);
+ baseRequest.setHandled(true);
+ }
+ }
+ }
+ }
+
+ @BeforeEach
+ public void start() throws Exception
+ {
+ _server = new Server();
+ ServerConnector connector = new ServerConnector(_server);
+ connector.setPort(8080);
+ _server.addConnector(connector);
+
+ HandlerList handlers = new HandlerList();
+ testHandler = new TestHandler();
+ handlers.addHandler(testHandler);
+
+ defaultCustomizer = new FrameHandler.ConfigurationCustomizer();
+ defaultCustomizer.setIdleTimeout(Duration.ofSeconds(3));
+
+ ContextHandler serverContext = new ContextHandler("/server");
+ serverFrameHandler = new EchoFrameHandler("SERVER");
+ WebSocketNegotiator negotiator = WebSocketNegotiator.from((negotiation) -> serverFrameHandler, defaultCustomizer);
+ WebSocketUpgradeHandler upgradeHandler = new WebSocketUpgradeHandler(negotiator);
+ serverContext.setHandler(upgradeHandler);
+ handlers.addHandler(serverContext);
+
+ _client = new WebSocketCoreClient();
+ _client.start();
+ URI uri = new URI("ws://localhost:8080/server/");
+
+ ContextHandler proxyContext = new ContextHandler("/proxy");
+ proxy = new WebSocketProxy(_client, uri);
+ negotiator = WebSocketNegotiator.from((negotiation) -> proxy.client2Proxy, defaultCustomizer);
+ upgradeHandler = new WebSocketUpgradeHandler(negotiator);
+ proxyContext.setHandler(upgradeHandler);
+ handlers.addHandler(proxyContext);
+
+ _server.setHandler(handlers);
+ _server.start();
+ }
+
+ @AfterEach
+ public void stop() throws Exception
+ {
+ _client.stop();
+ _server.stop();
+ }
+
+ public void awaitProxyClose(WebSocketProxy.Client2Proxy client2Proxy, WebSocketProxy.Server2Proxy server2Proxy) throws Exception
+ {
+ if (client2Proxy != null && !client2Proxy.closed.await(5, TimeUnit.SECONDS))
+ {
+ throw new TimeoutException("client2Proxy close timeout");
+ }
+
+ if (server2Proxy != null && !server2Proxy.closed.await(5, TimeUnit.SECONDS))
+ {
+ throw new TimeoutException("server2Proxy close timeout");
+ }
+ }
+
+ @Test
+ public void testEcho() throws Exception
+ {
+ TestAsyncFrameHandler clientFrameHandler = new TestAsyncFrameHandler("CLIENT");
+ WebSocketProxy.Client2Proxy proxyClientSide = proxy.client2Proxy;
+ WebSocketProxy.Server2Proxy proxyServerSide = proxy.server2Proxy;
+
+ ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(_client, new URI("ws://localhost:8080/proxy/"), clientFrameHandler);
+ upgradeRequest.setConfiguration(defaultCustomizer);
+ CompletableFuture response = _client.connect(upgradeRequest);
+
+ response.get(5, TimeUnit.SECONDS);
+ clientFrameHandler.sendText("hello world");
+ clientFrameHandler.close(CloseStatus.NORMAL, "standard close");
+ assertTrue(clientFrameHandler.closeLatch.await(5, TimeUnit.SECONDS));
+ assertTrue(serverFrameHandler.closeLatch.await(5, TimeUnit.SECONDS));
+ awaitProxyClose(proxyClientSide, proxyServerSide);
+
+ assertThat(proxyClientSide.getState(), is(WebSocketProxy.State.CLOSED));
+ assertThat(proxyServerSide.getState(), is(WebSocketProxy.State.CLOSED));
+
+ assertThat(proxyClientSide.receivedFrames.poll().getPayloadAsUTF8(), is("hello world"));
+ assertThat(serverFrameHandler.receivedFrames.poll().getPayloadAsUTF8(), is("hello world"));
+ assertThat(proxyServerSide.receivedFrames.poll().getPayloadAsUTF8(), is("hello world"));
+ assertThat(clientFrameHandler.receivedFrames.poll().getPayloadAsUTF8(), is("hello world"));
+
+ assertThat(CloseStatus.getCloseStatus(proxyClientSide.receivedFrames.poll()).getReason(), is("standard close"));
+ assertThat(CloseStatus.getCloseStatus(serverFrameHandler.receivedFrames.poll()).getReason(), is("standard close"));
+ assertThat(CloseStatus.getCloseStatus(proxyServerSide.receivedFrames.poll()).getReason(), is("standard close"));
+ assertThat(CloseStatus.getCloseStatus(clientFrameHandler.receivedFrames.poll()).getReason(), is("standard close"));
+
+ assertNull(proxyClientSide.receivedFrames.poll());
+ assertNull(serverFrameHandler.receivedFrames.poll());
+ assertNull(proxyServerSide.receivedFrames.poll());
+ assertNull(clientFrameHandler.receivedFrames.poll());
+ }
+
+ @Test
+ public void testFailServerUpgrade() throws Exception
+ {
+ testHandler.blockServerUpgradeRequests();
+ WebSocketProxy.Client2Proxy proxyClientSide = proxy.client2Proxy;
+ WebSocketProxy.Server2Proxy proxyServerSide = proxy.server2Proxy;
+
+ TestAsyncFrameHandler clientFrameHandler = new TestAsyncFrameHandler("CLIENT");
+ try (StacklessLogging stacklessLogging = new StacklessLogging(WebSocketCoreSession.class))
+ {
+ ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(_client, new URI("ws://localhost:8080/proxy/"), clientFrameHandler);
+ upgradeRequest.setConfiguration(defaultCustomizer);
+ CompletableFuture response = _client.connect(upgradeRequest);
+ response.get(5, TimeUnit.SECONDS);
+ clientFrameHandler.sendText("hello world");
+ clientFrameHandler.close();
+ assertTrue(clientFrameHandler.closeLatch.await(5, TimeUnit.SECONDS));
+ awaitProxyClose(proxyClientSide, null);
+ }
+
+ assertNull(proxyClientSide.receivedFrames.poll());
+ assertThat(proxyClientSide.getState(), is(WebSocketProxy.State.FAILED));
+
+ assertNull(proxyServerSide.receivedFrames.poll());
+ assertThat(proxyServerSide.getState(), is(WebSocketProxy.State.FAILED));
+
+ assertFalse(serverFrameHandler.openLatch.await(250, TimeUnit.MILLISECONDS));
+
+ CloseStatus closeStatus = CloseStatus.getCloseStatus(clientFrameHandler.receivedFrames.poll());
+ assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
+ assertThat(closeStatus.getReason(), containsString("Failed to upgrade to websocket: Unexpected HTTP Response Status Code:"));
+ }
+
+
+ @Test
+ public void testClientError() throws Exception
+ {
+ TestAsyncFrameHandler clientFrameHandler = new TestAsyncFrameHandler("CLIENT")
+ {
+ @Override
+ public void onOpen(CoreSession coreSession, Callback callback)
+ {
+ throw new IllegalStateException("simulated client onOpen error");
+ }
+ };
+ WebSocketProxy.Client2Proxy proxyClientSide = proxy.client2Proxy;
+ WebSocketProxy.Server2Proxy proxyServerSide = proxy.server2Proxy;
+
+ try (StacklessLogging stacklessLogging = new StacklessLogging(WebSocketCoreSession.class))
+ {
+ ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(_client, new URI("ws://localhost:8080/proxy/"), clientFrameHandler);
+ upgradeRequest.setConfiguration(defaultCustomizer);
+ CompletableFuture response = _client.connect(upgradeRequest);
+ Exception e = assertThrows(ExecutionException.class, ()->response.get(5, TimeUnit.SECONDS));
+ assertThat(e.getMessage(), containsString("simulated client onOpen error"));
+ assertTrue(clientFrameHandler.closeLatch.await(5, TimeUnit.SECONDS));
+ assertTrue(serverFrameHandler.closeLatch.await(5, TimeUnit.SECONDS));
+ awaitProxyClose(proxyClientSide, proxyServerSide);
+ }
+
+ CloseStatus closeStatus = CloseStatus.getCloseStatus(proxyClientSide.receivedFrames.poll());
+ assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
+ assertThat(closeStatus.getReason(), containsString("simulated client onOpen error"));
+ assertThat(proxyClientSide.getState(), is(WebSocketProxy.State.CLOSED));
+
+ closeStatus = CloseStatus.getCloseStatus(proxyServerSide.receivedFrames.poll());
+ assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
+ assertThat(closeStatus.getReason(), containsString("simulated client onOpen error"));
+ assertThat(proxyServerSide.getState(), is(WebSocketProxy.State.CLOSED));
+
+ closeStatus = CloseStatus.getCloseStatus(serverFrameHandler.receivedFrames.poll());
+ assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
+ assertThat(closeStatus.getReason(), containsString("simulated client onOpen error"));
+
+ assertNull(clientFrameHandler.receivedFrames.poll());
+ }
+
+ @Test
+ public void testServerError() throws Exception
+ {
+ serverFrameHandler.throwOnFrame();
+ WebSocketProxy.Client2Proxy proxyClientSide = proxy.client2Proxy;
+ WebSocketProxy.Server2Proxy proxyServerSide = proxy.server2Proxy;
+
+ TestAsyncFrameHandler clientFrameHandler = new TestAsyncFrameHandler("CLIENT");
+ ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(_client, new URI("ws://localhost:8080/proxy/"), clientFrameHandler);
+ upgradeRequest.setConfiguration(defaultCustomizer);
+ CompletableFuture response = _client.connect(upgradeRequest);
+
+ response.get(5, TimeUnit.SECONDS);
+ clientFrameHandler.sendText("hello world");
+ assertTrue(clientFrameHandler.closeLatch.await(5, TimeUnit.SECONDS));
+ assertTrue(serverFrameHandler.closeLatch.await(5, TimeUnit.SECONDS));
+ awaitProxyClose(proxyClientSide, proxyServerSide);
+
+ CloseStatus closeStatus;
+ Frame frame;
+
+ // Client2Proxy
+ frame = proxyClientSide.receivedFrames.poll();
+ assertThat(frame.getOpCode(), is(OpCode.TEXT));
+ assertThat(frame.getPayloadAsUTF8(), is("hello world"));
+
+ // Server
+ frame = serverFrameHandler.receivedFrames.poll();
+ assertThat(frame.getOpCode(), is(OpCode.TEXT));
+ assertThat(frame.getPayloadAsUTF8(), is("hello world"));
+ frame = serverFrameHandler.receivedFrames.poll();
+ assertNull(frame);
+
+ // Server2Proxy
+ frame = proxyServerSide.receivedFrames.poll();
+ closeStatus = CloseStatus.getCloseStatus(frame);
+ assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
+ assertThat(closeStatus.getReason(), is("intentionally throwing in server onFrame()"));
+
+ // Client
+ frame = clientFrameHandler.receivedFrames.poll();
+ closeStatus = CloseStatus.getCloseStatus(frame);
+ assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
+ assertThat(closeStatus.getReason(), is("intentionally throwing in server onFrame()"));
+
+ // Client2Proxy receiving close response from Client
+ frame = proxyClientSide.receivedFrames.poll();
+ closeStatus = CloseStatus.getCloseStatus(frame);
+ assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
+ assertThat(closeStatus.getReason(), is("intentionally throwing in server onFrame()"));
+
+ // Check Proxy is in expected final state
+ assertNull(proxyClientSide.receivedFrames.poll());
+ assertNull(proxyServerSide.receivedFrames.poll());
+ assertThat(proxyClientSide.getState(), is(WebSocketProxy.State.CLOSED));
+ assertThat(proxyServerSide.getState(), is(WebSocketProxy.State.CLOSED));
+ }
+
+ @Test
+ public void testServerErrorClientNoResponse() throws Exception
+ {
+ serverFrameHandler.throwOnFrame();
+ WebSocketProxy.Client2Proxy proxyClientSide = proxy.client2Proxy;
+ WebSocketProxy.Server2Proxy proxyServerSide = proxy.server2Proxy;
+
+ TestAsyncFrameHandler clientFrameHandler = new TestAsyncFrameHandler("CLIENT")
+ {
+ @Override
+ public void onFrame(Frame frame, Callback callback)
+ {
+ LOG.info("[{}] onFrame {}", name, frame);
+ receivedFrames.offer(Frame.copy(frame));
+ }
+ };
+
+ ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(_client, new URI("ws://localhost:8080/proxy/"), clientFrameHandler);
+ upgradeRequest.setConfiguration(defaultCustomizer);
+ CompletableFuture response = _client.connect(upgradeRequest);
+ response.get(5, TimeUnit.SECONDS);
+ clientFrameHandler.sendText("hello world");
+ assertTrue(clientFrameHandler.closeLatch.await(5, TimeUnit.SECONDS));
+ assertTrue(serverFrameHandler.closeLatch.await(5, TimeUnit.SECONDS));
+ awaitProxyClose(proxyClientSide, proxyServerSide);
+
+ CloseStatus closeStatus;
+ Frame frame;
+
+ // Client2Proxy
+ frame = proxyClientSide.receivedFrames.poll();
+ assertThat(frame.getOpCode(), is(OpCode.TEXT));
+ assertThat(frame.getPayloadAsUTF8(), is("hello world"));
+
+ // Server
+ frame = serverFrameHandler.receivedFrames.poll();
+ assertThat(frame.getOpCode(), is(OpCode.TEXT));
+ assertThat(frame.getPayloadAsUTF8(), is("hello world"));
+ assertNull(serverFrameHandler.receivedFrames.poll());
+
+ // Server2Proxy
+ frame = proxyServerSide.receivedFrames.poll();
+ closeStatus = CloseStatus.getCloseStatus(frame);
+ assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
+ assertThat(closeStatus.getReason(), is("intentionally throwing in server onFrame()"));
+
+ // Client
+ frame = clientFrameHandler.receivedFrames.poll();
+ closeStatus = CloseStatus.getCloseStatus(frame);
+ assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
+ assertThat(closeStatus.getReason(), is("intentionally throwing in server onFrame()"));
+ assertNull(clientFrameHandler.receivedFrames.poll());
+
+ // Client2Proxy does NOT receive close response from the client and fails
+ assertNull(proxyClientSide.receivedFrames.poll());
+ assertThat(proxyClientSide.getState(), is(WebSocketProxy.State.FAILED));
+
+ // Server2Proxy is failed by the Client2Proxy
+ assertNull(proxyServerSide.receivedFrames.poll());
+ assertThat(proxyServerSide.getState(), is(WebSocketProxy.State.FAILED));
+ }
+}