From ace5e7bbe28d550753bd5c860d29a078feb2d203 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Wed, 13 Feb 2019 11:34:52 +1100 Subject: [PATCH] Issue #3170 - Redesign on the WebSocketProxy - Redesigned the proxy into a new class called WebSocketProxy containing a Client2Proxy FrameHandler and a Server2Proxy FrameHandler - WebSocketProxy uses synchronized blocks with an enum state instead of the previous compare and sets - Created a new test similar to ProxyFrameHandlerTest to test the new WebSocketProxy Signed-off-by: Lachlan Roberts --- .../websocket/core/proxy/WebSocketProxy.java | 529 ++++++++++++++++++ .../core/proxy/WebSocketProxyTest.java | 107 ++++ 2 files changed, 636 insertions(+) create mode 100644 jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/WebSocketProxy.java create mode 100644 jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/WebSocketProxyTest.java diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/WebSocketProxy.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/WebSocketProxy.java new file mode 100644 index 00000000000..32190b4299b --- /dev/null +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/WebSocketProxy.java @@ -0,0 +1,529 @@ +package org.eclipse.jetty.websocket.core.proxy; + +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.BlockingQueue; + +import org.eclipse.jetty.util.BlockingArrayQueue; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.websocket.core.CloseStatus; +import org.eclipse.jetty.websocket.core.Frame; +import org.eclipse.jetty.websocket.core.FrameHandler; +import org.eclipse.jetty.websocket.core.OpCode; +import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient; + +class WebSocketProxy +{ + enum State + { + NOT_OPEN, + CONNECTING, + OPEN, + ICLOSED, + OCLOSED, + CLOSED, + FAILED + } + + private final Object lock = new Object(); + + WebSocketCoreClient client; + private URI serverUri; + + public Client2Proxy client2Proxy = new Client2Proxy(); + public Server2Proxy server2Proxy = new Server2Proxy(); + + public WebSocketProxy(WebSocketCoreClient client, URI serverUri) + { + this.client = client; + this.serverUri = serverUri; + } + + class Client2Proxy implements FrameHandler + { + private CoreSession client; + private State state = State.NOT_OPEN; + + private Callback closeCallback; + private Throwable error; + + public BlockingQueue receivedFrames = new BlockingArrayQueue<>(); + + @Override + public void onOpen(CoreSession session, Callback callback) + { + System.err.println("[Client2Proxy] onOpen: " + session); + + synchronized (lock) + { + switch (state) + { + case NOT_OPEN: + state = State.CONNECTING; + client = session; + + Callback wrappedOnOpenCallback = new Callback() + { + @Override + public void succeeded() + { + synchronized (lock) + { + switch (state) + { + case CONNECTING: + state = State.OPEN; + callback.succeeded(); + break; + + case FAILED: + server2Proxy.fail(error, callback); + break; + + default: + callback.failed(new IllegalStateException()); + } + } + } + + @Override + public void failed(Throwable x) + { + synchronized (lock) + { + switch (state) + { + case CONNECTING: + state = State.FAILED; + error = x; + callback.failed(x); + break; + + case FAILED: + callback.failed(x); + break; + + default: + callback.failed(new IllegalStateException()); + } + } + } + }; + + server2Proxy.connect(wrappedOnOpenCallback); + break; + + default: + throw new IllegalStateException(); + } + } + } + + @Override + public void onFrame(Frame frame, Callback callback) + { + System.err.println("[Client2Proxy] onFrame(): " + frame); + receivedFrames.offer(Frame.copy(frame)); + + synchronized (lock) + { + switch (state) + { + case OPEN: + if (frame.getOpCode() == OpCode.CLOSE) + { + state = State.ICLOSED; + closeCallback = callback; + server2Proxy.send(frame, Callback.from(()->{}, callback::failed)); + } + else + { + server2Proxy.send(frame, callback); + } + break; + + case OCLOSED: + if (frame.getOpCode() == OpCode.CLOSE) + state = State.CLOSED; + + server2Proxy.send(frame, callback); + break; + + case FAILED: + callback.failed(error); + + default: + callback.failed(new IllegalStateException()); + } + } + } + + @Override + public void onError(Throwable failure, Callback callback) + { + System.err.println("[Client2Proxy] onError(): " + failure); + failure.printStackTrace(); + + synchronized (lock) + { + switch (state) + { + case FAILED: + case CLOSED: + callback.failed(failure); + break; + + default: + state = State.FAILED; + error = failure; + server2Proxy.fail(failure,callback); + break; + } + } + + } + + public void fail(Throwable failure, Callback callback) + { + System.err.println("[Client2Proxy] fail(): " + failure); + + synchronized (lock) + { + switch (state) + { + case NOT_OPEN: + state = State.FAILED; + callback.failed(failure); + break; + + case CONNECTING: + state = State.FAILED; + callback.failed(failure); + break; + + case OPEN: + state = State.FAILED; + client.close(CloseStatus.SHUTDOWN, failure.getMessage(), Callback.from(callback, failure)); + break; + + case ICLOSED: + state = State.FAILED; + Callback doubleCallback = Callback.from(callback, closeCallback); + client.close(CloseStatus.SHUTDOWN, failure.getMessage(), Callback.from(doubleCallback, failure)); + + case FAILED: + case CLOSED: + case OCLOSED: + state = State.FAILED; + callback.failed(failure); + break; + + default: + throw new IllegalStateException(); + } + } + } + + @Override + public void onClosed(CloseStatus closeStatus, Callback callback) + { + System.err.println("[Client2Proxy] onClosed(): " + closeStatus); + + callback.succeeded(); + } + + + public void send(Frame frame, Callback callback) + { + System.err.println("[Client2Proxy] onClosed(): " + frame); + + synchronized (lock) + { + switch (state) + { + case OPEN: + if (frame.getOpCode() == OpCode.CLOSE) + state = State.OCLOSED; + + client.sendFrame(frame, callback, false); + break; + + case ICLOSED: + if (frame.getOpCode() == OpCode.CLOSE) + { + state = State.CLOSED; + client.sendFrame(frame, Callback.from(callback, closeCallback), false); + } + else + { + client.sendFrame(frame, callback, false); + } + break; + + case FAILED: + callback.failed(error); + break; + + default: + callback.failed(new IllegalStateException()); + } + } + } + } + + class Server2Proxy implements FrameHandler + { + private CoreSession server; + private State state = State.NOT_OPEN; + + private Callback closeCallback; + private Throwable error; + + public BlockingQueue receivedFrames = new BlockingArrayQueue<>(); + + public void connect(Callback callback) + { + System.err.println("[Server2Proxy] connect()"); + + synchronized (lock) + { + switch (state) + { + case NOT_OPEN: + try + { + state = State.CONNECTING; + client.connect(this, serverUri).whenComplete((s,t)->{ + if (t != null) + { + synchronized (lock) + { + switch (state) + { + case CONNECTING: + state = State.FAILED; + callback.failed(t); + break; + + case FAILED: + callback.failed(t); + break; + + default: + callback.failed(new IllegalStateException()); + } + } + } + else + { + synchronized (lock) + { + switch (state) + { + case CONNECTING: + state = State.OPEN; + callback.succeeded(); + break; + + case FAILED: + s.close(CloseStatus.SHUTDOWN, error.getMessage(), Callback.from(callback, error)); + break; + + default: + callback.failed(new IllegalStateException()); + } + } + } + }); + } + catch (IOException e) + { + state = State.FAILED; + callback.failed(e); + } + break; + + case FAILED: + callback.failed(error); + break; + + default: + throw new IllegalStateException(); + } + } + } + + @Override + public void onOpen(CoreSession session, Callback callback) + { + System.err.println("[Server2Proxy] onOpen(): " + session); + + synchronized (lock) + { + switch (state) + { + case CONNECTING: + server = session; + callback.succeeded(); + break; + + case FAILED: + callback.failed(error); + break; + + default: + callback.failed(new IllegalStateException()); + } + } + } + + @Override + public void onFrame(Frame frame, Callback callback) + { + System.err.println("[Server2Proxy] onFrame(): " + frame); + receivedFrames.offer(Frame.copy(frame)); + + synchronized (lock) + { + switch (state) + { + case OPEN: + if (frame.getOpCode() == OpCode.CLOSE) + { + state = State.ICLOSED; + closeCallback = callback; + client2Proxy.send(frame, Callback.from(()->{}, callback::failed)); + } + else + { + client2Proxy.send(frame, callback); + } + break; + + case OCLOSED: + if (frame.getOpCode() == OpCode.CLOSE) + state = State.CLOSED; + + client2Proxy.send(frame, callback); + break; + + case FAILED: + callback.failed(error); + + default: + callback.failed(new IllegalStateException()); + } + } + } + + @Override + public void onError(Throwable failure, Callback callback) + { + System.err.println("[Server2Proxy] onError(): " + failure); + failure.printStackTrace(); + + synchronized (lock) + { + switch (state) + { + case FAILED: + case CLOSED: + callback.failed(failure); + break; + + default: + state = State.FAILED; + error = failure; + client2Proxy.fail(failure,callback); + break; + } + } + } + + @Override + public void onClosed(CloseStatus closeStatus, Callback callback) + { + System.err.println("[Server2Proxy] onClosed(): " + closeStatus); + + callback.succeeded(); + } + + public void fail(Throwable failure, Callback callback) + { + System.err.println("[Server2Proxy] fail(): " + failure); + + synchronized (lock) + { + switch (state) + { + case NOT_OPEN: + state = State.FAILED; + callback.failed(failure); + break; + + case CONNECTING: + state = State.FAILED; + callback.failed(failure); + break; + + case OPEN: + state = State.FAILED; + server.close(CloseStatus.SHUTDOWN, failure.getMessage(), Callback.from(callback, failure)); + break; + + case ICLOSED: + state = State.FAILED; + Callback doubleCallback = Callback.from(callback, closeCallback); + server.close(CloseStatus.SHUTDOWN, failure.getMessage(), Callback.from(doubleCallback, failure)); + + case FAILED: + case CLOSED: + case OCLOSED: + state = State.FAILED; + callback.failed(failure); + break; + + default: + throw new IllegalStateException(); + } + } + } + + public void send(Frame frame, Callback callback) + { + System.err.println("[Server2Proxy] send(): " + frame); + + synchronized (lock) + { + switch (state) + { + case OPEN: + if (frame.getOpCode() == OpCode.CLOSE) + state = State.OCLOSED; + + server.sendFrame(frame, callback, false); + break; + + case ICLOSED: + if (frame.getOpCode() == OpCode.CLOSE) + { + state = State.CLOSED; + server.sendFrame(frame, Callback.from(callback, closeCallback), false); + } + else + { + server.sendFrame(frame, callback, false); + } + break; + + case FAILED: + callback.failed(error); + break; + + default: + callback.failed(new IllegalStateException()); + } + } + } + } +} \ No newline at end of file diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/WebSocketProxyTest.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/WebSocketProxyTest.java new file mode 100644 index 00000000000..b8b0c445a1f --- /dev/null +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/WebSocketProxyTest.java @@ -0,0 +1,107 @@ +package org.eclipse.jetty.websocket.core.proxy; + +import java.net.URI; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.handler.ContextHandler; +import org.eclipse.jetty.server.handler.HandlerList; +import org.eclipse.jetty.websocket.core.CloseStatus; +import org.eclipse.jetty.websocket.core.Frame; +import org.eclipse.jetty.websocket.core.FrameHandler.CoreSession; +import org.eclipse.jetty.websocket.core.client.ClientUpgradeRequest; +import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient; +import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator; +import org.eclipse.jetty.websocket.core.server.WebSocketUpgradeHandler; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class WebSocketProxyTest +{ + Server _server; + WebSocketCoreClient _client; + + WebSocketProxy proxy; + BasicFrameHandler.ServerEchoHandler serverFrameHandler; + + @BeforeEach + public void start() throws Exception + { + _server = new Server(); + ServerConnector connector = new ServerConnector(_server); + connector.setPort(8080); + _server.addConnector(connector); + + HandlerList handlers = new HandlerList(); + + ContextHandler serverContext = new ContextHandler("/server"); + serverFrameHandler = new BasicFrameHandler.ServerEchoHandler("SERVER"); + WebSocketNegotiator negotiator = WebSocketNegotiator.from((negotiation) -> serverFrameHandler); + WebSocketUpgradeHandler upgradeHandler = new WebSocketUpgradeHandler(negotiator); + serverContext.setHandler(upgradeHandler); + handlers.addHandler(serverContext); + + _client = new WebSocketCoreClient(); + _client.start(); + URI uri = new URI("ws://localhost:8080/server"); + + ContextHandler proxyContext = new ContextHandler("/proxy"); + proxy = new WebSocketProxy(_client, uri); + negotiator = WebSocketNegotiator.from((negotiation) -> proxy.client2Proxy); + upgradeHandler = new WebSocketUpgradeHandler(negotiator); + proxyContext.setHandler(upgradeHandler); + handlers.addHandler(proxyContext); + + _server.setHandler(handlers); + _server.start(); + } + + @AfterEach + public void stop() throws Exception + { + _client.stop(); + _server.stop(); + } + + @Test + public void testHello() throws Exception + { + BasicFrameHandler clientHandler = new BasicFrameHandler("CLIENT"); + ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(_client, new URI("ws://localhost:8080/proxy"), clientHandler); + + CompletableFuture response = _client.connect(upgradeRequest); + response.get(5, TimeUnit.SECONDS); + clientHandler.sendText("hello world"); + clientHandler.close("standard close"); + + Frame frame; + + // Verify the the text frame was received + WebSocketProxy.Client2Proxy proxyClientSide = proxy.client2Proxy; + WebSocketProxy.Server2Proxy proxyServerSide = proxy.server2Proxy; + + assertThat(proxyClientSide.receivedFrames.poll().getPayloadAsUTF8(), is("hello world")); + assertThat(serverFrameHandler.receivedFrames.poll().getPayloadAsUTF8(), is("hello world")); + assertThat(proxyServerSide.receivedFrames.poll().getPayloadAsUTF8(), is("hello world")); + assertThat(clientHandler.receivedFrames.poll().getPayloadAsUTF8(), is("hello world")); + + // Verify the right close frame was received + assertThat(CloseStatus.getCloseStatus(proxyClientSide.receivedFrames.poll()).getReason(), is("standard close")); + assertThat(CloseStatus.getCloseStatus(serverFrameHandler.receivedFrames.poll()).getReason(), is("standard close")); + assertThat(CloseStatus.getCloseStatus(proxyServerSide.receivedFrames.poll()).getReason(), is("standard close")); + assertThat(CloseStatus.getCloseStatus(clientHandler.receivedFrames.poll()).getReason(), is("standard close")); + + // Verify no other frames were received + assertNull(proxyClientSide.receivedFrames.poll(250, TimeUnit.MILLISECONDS)); + assertNull(serverFrameHandler.receivedFrames.poll(250, TimeUnit.MILLISECONDS)); + assertNull(proxyServerSide.receivedFrames.poll(250, TimeUnit.MILLISECONDS)); + assertNull(clientHandler.receivedFrames.poll(250, TimeUnit.MILLISECONDS)); + } +}