diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/BasicFrameHandler.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/BasicFrameHandler.java new file mode 100644 index 00000000000..130fce50eb0 --- /dev/null +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/BasicFrameHandler.java @@ -0,0 +1,100 @@ +package org.eclipse.jetty.websocket.core.proxy; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.util.BlockingArrayQueue; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.websocket.core.CloseStatus; +import org.eclipse.jetty.websocket.core.Frame; +import org.eclipse.jetty.websocket.core.FrameHandler; +import org.eclipse.jetty.websocket.core.OpCode; + +class BasicFrameHandler implements FrameHandler +{ + protected String name; + protected CoreSession session; + protected CountDownLatch closed = new CountDownLatch(1); + + protected BlockingQueue receivedFrames = new BlockingArrayQueue<>(); + + + public BasicFrameHandler(String name) + { + this.name = "[" + name + "]"; + } + + @Override + public void onOpen(CoreSession coreSession, Callback callback) + { + session = coreSession; + + System.err.println(name + " onOpen(): " + session); + callback.succeeded(); + } + + @Override + public void onFrame(Frame frame, Callback callback) + { + System.err.println(name + " onFrame(): " + frame); + receivedFrames.offer(Frame.copy(frame)); + callback.succeeded(); + } + + @Override + public void onError(Throwable cause, Callback callback) + { + System.err.println(name + " onError(): " + cause); + cause.printStackTrace(); + callback.succeeded(); + } + + @Override + public void onClosed(CloseStatus closeStatus, Callback callback) + { + System.err.println(name + " onClosed(): " + closeStatus); + closed.countDown(); + callback.succeeded(); + } + + public void sendText(String message) + { + Frame textFrame = new Frame(OpCode.TEXT, BufferUtil.toBuffer(message)); + session.sendFrame(textFrame, Callback.NOOP, false); + } + + public void close() throws InterruptedException + { + session.close(CloseStatus.NORMAL, "standard close", Callback.NOOP); + awaitClose(); + } + + public void awaitClose() throws InterruptedException + { + closed.await(5, TimeUnit.SECONDS); + } + + + public static class EchoHandler extends BasicFrameHandler + { + public EchoHandler(String name) + { + super(name); + } + + @Override + public void onFrame(Frame frame, Callback callback) + { + System.err.println(name + " onFrame(): " + frame); + + if (frame.isDataFrame()) + session.sendFrame(new Frame(frame.getOpCode(), frame.getPayload()), callback, false); + else + callback.succeeded(); + + receivedFrames.offer(Frame.copy(frame)); + } + } +} \ No newline at end of file diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/ProxyFrameHandler.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/ProxyFrameHandler.java new file mode 100644 index 00000000000..f601915e58c --- /dev/null +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/ProxyFrameHandler.java @@ -0,0 +1,155 @@ +package org.eclipse.jetty.websocket.core.proxy; + +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.atomic.AtomicReference; + +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.websocket.core.CloseStatus; +import org.eclipse.jetty.websocket.core.Frame; +import org.eclipse.jetty.websocket.core.FrameHandler; +import org.eclipse.jetty.websocket.core.OpCode; +import org.eclipse.jetty.websocket.core.client.ClientUpgradeRequest; +import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient; + +class ProxyFrameHandler implements FrameHandler +{ + String name = "[PROXY_SERVER]"; + + URI serverUri; + WebSocketCoreClient client = new WebSocketCoreClient(); + + CoreSession clientSession; + volatile CoreSession serverSession; + + + AtomicReference closeFrameCallback = new AtomicReference<>(); + + public ProxyFrameHandler() + { + try + { + serverUri = new URI("ws://localhost:8080/server"); + client.start(); + } + catch (Exception e) + { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + @Override + public void onOpen(CoreSession coreSession, Callback callback) + { + System.err.println(name + " onOpen: " + coreSession); + clientSession = coreSession; + + try + { + ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(client, serverUri, new ProxyFrameHandlerClient()); + client.connect(upgradeRequest).whenComplete((s,t)->{ + if (t != null) + { + callback.failed(t); + } + else + { + serverSession = s; + callback.succeeded(); + } + }); + } + catch (IOException e) + { + e.printStackTrace(); + clientSession.close(CloseStatus.SERVER_ERROR, "proxy failed to connect to server", Callback.NOOP); + } + } + + @Override + public void onFrame(Frame frame, Callback callback) + { + System.err.println(name + " onFrame(): " + frame); + onFrame(serverSession, frame, callback); + } + + private void onFrame(CoreSession session, Frame frame, Callback callback) + { + if (frame.getOpCode() == OpCode.CLOSE) + { + + Callback closeCallback = Callback.NOOP; + + // If we have already received a close frame then we can succeed both callbacks + if (!closeFrameCallback.compareAndSet(null, callback)) + { + closeCallback = Callback.from(()-> + { + closeFrameCallback.get().succeeded(); + callback.succeeded(); + }, (t)-> + { + closeFrameCallback.get().failed(t); + callback.failed(t); + }); + } + + session.sendFrame(frame, closeCallback, false); + return; + } + else + { + session.sendFrame(Frame.copy(frame), callback, false); + } + } + + @Override + public void onError(Throwable cause, Callback callback) + { + System.err.println(name + " onError(): " + cause); + cause.printStackTrace(); + callback.succeeded(); + } + + @Override + public void onClosed(CloseStatus closeStatus, Callback callback) + { + System.err.println(name + " onClosed(): " + closeStatus); + callback.succeeded(); + } + + class ProxyFrameHandlerClient implements FrameHandler + { + String name = "[PROXY_CLIENT]"; + + @Override + public void onOpen(CoreSession coreSession, Callback callback) + { + serverSession = coreSession; + callback.succeeded(); + } + + @Override + public void onFrame(Frame frame, Callback callback) + { + System.err.println(name + " onFrame(): " + frame); + ProxyFrameHandler.this.onFrame(clientSession, frame, callback); + } + + @Override + public void onError(Throwable cause, Callback callback) + { + System.err.println(name + " onError(): " + cause); + cause.printStackTrace(); + callback.succeeded(); + } + + @Override + public void onClosed(CloseStatus closeStatus, Callback callback) + { + System.err.println(name + " onClosed(): " + closeStatus); + callback.succeeded(); + } + } +} diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/WebSocketProxyTest.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/WebSocketProxyTest.java new file mode 100644 index 00000000000..d5f8960c200 --- /dev/null +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/WebSocketProxyTest.java @@ -0,0 +1,74 @@ +package org.eclipse.jetty.websocket.core.proxy; + +import java.net.URI; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.handler.ContextHandler; +import org.eclipse.jetty.server.handler.HandlerList; +import org.eclipse.jetty.websocket.core.FrameHandler.CoreSession; +import org.eclipse.jetty.websocket.core.client.ClientUpgradeRequest; +import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient; +import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator; +import org.eclipse.jetty.websocket.core.server.WebSocketUpgradeHandler; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class WebSocketProxyTest +{ + Server _server; + WebSocketCoreClient _client; + + + @BeforeEach + public void start() throws Exception + { + _server = new Server(); + ServerConnector connector = new ServerConnector(_server); + connector.setPort(8080); + _server.addConnector(connector); + + HandlerList handlers = new HandlerList(); + + ContextHandler serverContext = new ContextHandler("/server"); + WebSocketNegotiator negotiator = WebSocketNegotiator.from((negotiation) -> new BasicFrameHandler.EchoHandler("SERVER")); + WebSocketUpgradeHandler upgradeHandler = new WebSocketUpgradeHandler(negotiator); + serverContext.setHandler(upgradeHandler); + handlers.addHandler(serverContext); + + ContextHandler proxyContext = new ContextHandler("/proxy"); + negotiator = WebSocketNegotiator.from((negotiation) -> new ProxyFrameHandler()); + upgradeHandler = new WebSocketUpgradeHandler(negotiator); + proxyContext.setHandler(upgradeHandler); + handlers.addHandler(proxyContext); + + _server.setHandler(handlers); + _server.start(); + + _client = new WebSocketCoreClient(); + _client.start(); + } + + @AfterEach + public void stop() throws Exception + { + _client.stop(); + _server.stop(); + } + + + @Test + public void testHello() throws Exception + { + BasicFrameHandler clientHandler = new BasicFrameHandler("CLIENT"); + ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(_client, new URI("ws://localhost:8080/proxy"), clientHandler); + + CompletableFuture response = _client.connect(upgradeRequest); + response.get(5, TimeUnit.SECONDS); + clientHandler.sendText("hello world"); + clientHandler.close(); + } +}