Issue #3170 - WebSocket Proxy PoC

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2019-01-31 22:18:31 +11:00
parent 83b53f102c
commit c1e3e57f03
3 changed files with 329 additions and 0 deletions

View File

@ -0,0 +1,100 @@
package org.eclipse.jetty.websocket.core.proxy;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.OpCode;
class BasicFrameHandler implements FrameHandler
{
protected String name;
protected CoreSession session;
protected CountDownLatch closed = new CountDownLatch(1);
protected BlockingQueue<Frame> receivedFrames = new BlockingArrayQueue<>();
public BasicFrameHandler(String name)
{
this.name = "[" + name + "]";
}
@Override
public void onOpen(CoreSession coreSession, Callback callback)
{
session = coreSession;
System.err.println(name + " onOpen(): " + session);
callback.succeeded();
}
@Override
public void onFrame(Frame frame, Callback callback)
{
System.err.println(name + " onFrame(): " + frame);
receivedFrames.offer(Frame.copy(frame));
callback.succeeded();
}
@Override
public void onError(Throwable cause, Callback callback)
{
System.err.println(name + " onError(): " + cause);
cause.printStackTrace();
callback.succeeded();
}
@Override
public void onClosed(CloseStatus closeStatus, Callback callback)
{
System.err.println(name + " onClosed(): " + closeStatus);
closed.countDown();
callback.succeeded();
}
public void sendText(String message)
{
Frame textFrame = new Frame(OpCode.TEXT, BufferUtil.toBuffer(message));
session.sendFrame(textFrame, Callback.NOOP, false);
}
public void close() throws InterruptedException
{
session.close(CloseStatus.NORMAL, "standard close", Callback.NOOP);
awaitClose();
}
public void awaitClose() throws InterruptedException
{
closed.await(5, TimeUnit.SECONDS);
}
public static class EchoHandler extends BasicFrameHandler
{
public EchoHandler(String name)
{
super(name);
}
@Override
public void onFrame(Frame frame, Callback callback)
{
System.err.println(name + " onFrame(): " + frame);
if (frame.isDataFrame())
session.sendFrame(new Frame(frame.getOpCode(), frame.getPayload()), callback, false);
else
callback.succeeded();
receivedFrames.offer(Frame.copy(frame));
}
}
}

View File

@ -0,0 +1,155 @@
package org.eclipse.jetty.websocket.core.proxy;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
class ProxyFrameHandler implements FrameHandler
{
String name = "[PROXY_SERVER]";
URI serverUri;
WebSocketCoreClient client = new WebSocketCoreClient();
CoreSession clientSession;
volatile CoreSession serverSession;
AtomicReference<Callback> closeFrameCallback = new AtomicReference<>();
public ProxyFrameHandler()
{
try
{
serverUri = new URI("ws://localhost:8080/server");
client.start();
}
catch (Exception e)
{
e.printStackTrace();
throw new RuntimeException(e);
}
}
@Override
public void onOpen(CoreSession coreSession, Callback callback)
{
System.err.println(name + " onOpen: " + coreSession);
clientSession = coreSession;
try
{
ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(client, serverUri, new ProxyFrameHandlerClient());
client.connect(upgradeRequest).whenComplete((s,t)->{
if (t != null)
{
callback.failed(t);
}
else
{
serverSession = s;
callback.succeeded();
}
});
}
catch (IOException e)
{
e.printStackTrace();
clientSession.close(CloseStatus.SERVER_ERROR, "proxy failed to connect to server", Callback.NOOP);
}
}
@Override
public void onFrame(Frame frame, Callback callback)
{
System.err.println(name + " onFrame(): " + frame);
onFrame(serverSession, frame, callback);
}
private void onFrame(CoreSession session, Frame frame, Callback callback)
{
if (frame.getOpCode() == OpCode.CLOSE)
{
Callback closeCallback = Callback.NOOP;
// If we have already received a close frame then we can succeed both callbacks
if (!closeFrameCallback.compareAndSet(null, callback))
{
closeCallback = Callback.from(()->
{
closeFrameCallback.get().succeeded();
callback.succeeded();
}, (t)->
{
closeFrameCallback.get().failed(t);
callback.failed(t);
});
}
session.sendFrame(frame, closeCallback, false);
return;
}
else
{
session.sendFrame(Frame.copy(frame), callback, false);
}
}
@Override
public void onError(Throwable cause, Callback callback)
{
System.err.println(name + " onError(): " + cause);
cause.printStackTrace();
callback.succeeded();
}
@Override
public void onClosed(CloseStatus closeStatus, Callback callback)
{
System.err.println(name + " onClosed(): " + closeStatus);
callback.succeeded();
}
class ProxyFrameHandlerClient implements FrameHandler
{
String name = "[PROXY_CLIENT]";
@Override
public void onOpen(CoreSession coreSession, Callback callback)
{
serverSession = coreSession;
callback.succeeded();
}
@Override
public void onFrame(Frame frame, Callback callback)
{
System.err.println(name + " onFrame(): " + frame);
ProxyFrameHandler.this.onFrame(clientSession, frame, callback);
}
@Override
public void onError(Throwable cause, Callback callback)
{
System.err.println(name + " onError(): " + cause);
cause.printStackTrace();
callback.succeeded();
}
@Override
public void onClosed(CloseStatus closeStatus, Callback callback)
{
System.err.println(name + " onClosed(): " + closeStatus);
callback.succeeded();
}
}
}

View File

@ -0,0 +1,74 @@
package org.eclipse.jetty.websocket.core.proxy;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.websocket.core.FrameHandler.CoreSession;
import org.eclipse.jetty.websocket.core.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
import org.eclipse.jetty.websocket.core.server.WebSocketUpgradeHandler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class WebSocketProxyTest
{
Server _server;
WebSocketCoreClient _client;
@BeforeEach
public void start() throws Exception
{
_server = new Server();
ServerConnector connector = new ServerConnector(_server);
connector.setPort(8080);
_server.addConnector(connector);
HandlerList handlers = new HandlerList();
ContextHandler serverContext = new ContextHandler("/server");
WebSocketNegotiator negotiator = WebSocketNegotiator.from((negotiation) -> new BasicFrameHandler.EchoHandler("SERVER"));
WebSocketUpgradeHandler upgradeHandler = new WebSocketUpgradeHandler(negotiator);
serverContext.setHandler(upgradeHandler);
handlers.addHandler(serverContext);
ContextHandler proxyContext = new ContextHandler("/proxy");
negotiator = WebSocketNegotiator.from((negotiation) -> new ProxyFrameHandler());
upgradeHandler = new WebSocketUpgradeHandler(negotiator);
proxyContext.setHandler(upgradeHandler);
handlers.addHandler(proxyContext);
_server.setHandler(handlers);
_server.start();
_client = new WebSocketCoreClient();
_client.start();
}
@AfterEach
public void stop() throws Exception
{
_client.stop();
_server.stop();
}
@Test
public void testHello() throws Exception
{
BasicFrameHandler clientHandler = new BasicFrameHandler("CLIENT");
ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(_client, new URI("ws://localhost:8080/proxy"), clientHandler);
CompletableFuture<CoreSession> response = _client.connect(upgradeRequest);
response.get(5, TimeUnit.SECONDS);
clientHandler.sendText("hello world");
clientHandler.close();
}
}