Issue #3170 - WebSocketProxy cleanup

removing old versions of the proxy frame handler
adding licence headers

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2019-02-19 09:28:36 +11:00
parent 4aa52c2f43
commit 3e33b35d82
5 changed files with 54 additions and 348 deletions

View File

@ -1,3 +1,21 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.proxy;
import java.util.concurrent.BlockingQueue;

View File

@ -1,244 +0,0 @@
package org.eclipse.jetty.websocket.core.proxy;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
class ProxyFrameHandler implements FrameHandler
{
private String name = "[ClientToProxy]";
private URI serverUri;
private WebSocketCoreClient client;
private CoreSession clientSession;
private AtomicReference<CoreSession> serverSession = new AtomicReference<>();
private AtomicReference<Callback> closeFrameCallback = new AtomicReference<>();
private static CoreSession EMPTY_SESSION = new CoreSession.Empty();
protected BlockingQueue<Frame> receivedFrames = new BlockingArrayQueue<>();
public ProxyFrameHandler(WebSocketCoreClient client, URI serverUri)
{
this.client = client;
this.serverUri = serverUri;
}
@Override
public void onOpen(CoreSession coreSession, Callback callback)
{
System.err.println(name + " onOpen: " + coreSession);
clientSession = coreSession;
try
{
ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(client, serverUri, new ServerToProxyFrameHandler());
client.connect(upgradeRequest).whenComplete((s, t) ->
{
if (t != null)
{
// If an onError callback was waiting to be completed in serverToProxyFH onOpen, then we must fail it.
while (true)
{
CoreSession session = serverSession.get();
if (session == null)
{
if (serverSession.compareAndSet(null, EMPTY_SESSION))
break;
}
else if (session == EMPTY_SESSION)
{
break;
}
else
{
if (serverSession.compareAndSet(session, EMPTY_SESSION))
{
if (session instanceof FailedCoreSession)
{
FailedCoreSession failedSession = (FailedCoreSession)session;
failedSession.failed(t);
t.addSuppressed(failedSession.getThrowable());
}
break;
}
}
}
callback.failed(t);
}
else
{
callback.succeeded();
}
});
}
catch (IOException e)
{
callback.failed(e);
}
}
@Override
public void onFrame(Frame frame, Callback callback)
{
System.err.println(name + " onFrame(): " + frame);
receivedFrames.offer(Frame.copy(frame));
onFrame(serverSession.get(), frame, callback);
}
private void onFrame(CoreSession session, Frame frame, Callback callback)
{
if (frame.getOpCode() == OpCode.CLOSE)
{
if (closeFrameCallback.compareAndSet(null, callback))
halfClose(session, frame, callback);
else
fullClose(session, frame, callback);
}
else
{
System.err.println(name + " forwardFrame(): " + frame);
session.sendFrame(frame, callback, false);
}
}
private void halfClose(CoreSession session, Frame frame , Callback callback)
{
Callback closeCallback = Callback.from(() -> {}, callback::failed);
System.err.println(name + " halfClose()");
session.sendFrame(frame, closeCallback, false);
}
private void fullClose(CoreSession session, Frame frame , Callback callback)
{
Callback closeCallback = Callback.from(closeFrameCallback.get(), callback);
System.err.println(name + " fullClose()");
session.sendFrame(frame, closeCallback, false);
}
@Override
public void onError(Throwable cause, Callback callback)
{
System.err.println(name + " onError(): " + cause);
cause.printStackTrace();
while (true)
{
CoreSession session = serverSession.get();
if (session == EMPTY_SESSION)
{
callback.failed(cause);
break;
}
else if (session == null)
{
if (serverSession.compareAndSet(null, new FailedCoreSession(cause, callback)))
break;
}
else
{
if (serverSession.compareAndSet(session, EMPTY_SESSION))
{
serverSession.get().close(CloseStatus.SHUTDOWN, cause.getMessage(), callback);
break;
}
}
}
}
@Override
public void onClosed(CloseStatus closeStatus, Callback callback)
{
System.err.println(name + " onClosed(): " + closeStatus);
callback.succeeded();
}
class ServerToProxyFrameHandler implements FrameHandler
{
String name = "[ServerToProxy]";
@Override
public void onOpen(CoreSession coreSession, Callback callback)
{
if (!serverSession.compareAndSet(null, coreSession))
{
FailedCoreSession session = (FailedCoreSession)serverSession.get();
session.failed();
callback.failed(session.getThrowable());
return;
}
callback.succeeded();
}
@Override
public void onFrame(Frame frame, Callback callback)
{
System.err.println(name + " onFrame(): " + frame);
receivedFrames.offer(Frame.copy(frame));
ProxyFrameHandler.this.onFrame(clientSession, frame, callback);
}
@Override
public void onError(Throwable cause, Callback callback)
{
System.err.println(name + " onError(): " + cause);
cause.printStackTrace();
clientSession.close(CloseStatus.SERVER_ERROR, cause.getMessage(), callback);
}
@Override
public void onClosed(CloseStatus closeStatus, Callback callback)
{
System.err.println(name + " onClosed(): " + closeStatus);
callback.succeeded();
}
}
static class FailedCoreSession extends CoreSession.Empty
{
private Throwable throwable;
private Callback callback;
public FailedCoreSession(Throwable throwable, Callback callback)
{
this.throwable = throwable;
this.callback = callback;
}
public Throwable getThrowable()
{
return throwable;
}
public Callback getCallback()
{
return callback;
}
public void failed(Throwable t)
{
throwable.addSuppressed(t);
failed();
}
public void failed()
{
callback.failed(throwable);
}
}
}

View File

@ -1,104 +0,0 @@
package org.eclipse.jetty.websocket.core.proxy;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler.CoreSession;
import org.eclipse.jetty.websocket.core.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
import org.eclipse.jetty.websocket.core.server.WebSocketUpgradeHandler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertNull;
public class ProxyFrameHandlerTest
{
Server _server;
WebSocketCoreClient _client;
ProxyFrameHandler proxyFrameHandler;
BasicFrameHandler.ServerEchoHandler serverFrameHandler;
@BeforeEach
public void start() throws Exception
{
_server = new Server();
ServerConnector connector = new ServerConnector(_server);
connector.setPort(8080);
_server.addConnector(connector);
HandlerList handlers = new HandlerList();
ContextHandler serverContext = new ContextHandler("/server");
serverFrameHandler = new BasicFrameHandler.ServerEchoHandler("SERVER");
WebSocketNegotiator negotiator = WebSocketNegotiator.from((negotiation) -> serverFrameHandler);
WebSocketUpgradeHandler upgradeHandler = new WebSocketUpgradeHandler(negotiator);
serverContext.setHandler(upgradeHandler);
handlers.addHandler(serverContext);
_client = new WebSocketCoreClient();
_client.start();
URI uri = new URI("ws://localhost:8080/server");
ContextHandler proxyContext = new ContextHandler("/proxy");
proxyFrameHandler = new ProxyFrameHandler(_client, uri);
negotiator = WebSocketNegotiator.from((negotiation) -> proxyFrameHandler);
upgradeHandler = new WebSocketUpgradeHandler(negotiator);
proxyContext.setHandler(upgradeHandler);
handlers.addHandler(proxyContext);
_server.setHandler(handlers);
_server.start();
}
@AfterEach
public void stop() throws Exception
{
_client.stop();
_server.stop();
}
@Test
public void testHello() throws Exception
{
BasicFrameHandler clientHandler = new BasicFrameHandler("CLIENT");
ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(_client, new URI("ws://localhost:8080/proxy"), clientHandler);
CompletableFuture<CoreSession> response = _client.connect(upgradeRequest);
response.get(5, TimeUnit.SECONDS);
clientHandler.sendText("hello world");
clientHandler.close("standard close");
Frame frame;
// Verify the the text frame was received
assertThat(proxyFrameHandler.receivedFrames.poll().getPayloadAsUTF8(), is("hello world"));
assertThat(serverFrameHandler.receivedFrames.poll().getPayloadAsUTF8(), is("hello world"));
assertThat(proxyFrameHandler.receivedFrames.poll().getPayloadAsUTF8(), is("hello world"));
assertThat(clientHandler.receivedFrames.poll().getPayloadAsUTF8(), is("hello world"));
// Verify the right close frame was received
assertThat(CloseStatus.getCloseStatus(proxyFrameHandler.receivedFrames.poll()).getReason(), is("standard close"));
assertThat(CloseStatus.getCloseStatus(serverFrameHandler.receivedFrames.poll()).getReason(), is("standard close"));
assertThat(CloseStatus.getCloseStatus(proxyFrameHandler.receivedFrames.poll()).getReason(), is("standard close"));
assertThat(CloseStatus.getCloseStatus(clientHandler.receivedFrames.poll()).getReason(), is("standard close"));
// Verify no other frames were received
assertNull(proxyFrameHandler.receivedFrames.poll(250, TimeUnit.MILLISECONDS));
assertNull(serverFrameHandler.receivedFrames.poll(250, TimeUnit.MILLISECONDS));
assertNull(proxyFrameHandler.receivedFrames.poll(250, TimeUnit.MILLISECONDS));
assertNull(clientHandler.receivedFrames.poll(250, TimeUnit.MILLISECONDS));
}
}

View File

@ -1,3 +1,21 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.proxy;
import java.io.IOException;

View File

@ -1,3 +1,21 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.proxy;
import java.io.IOException;