Improve WebSocketProxy, and write tests for it
Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
parent
d7598f3ff2
commit
b89adb8dae
|
@ -135,6 +135,24 @@ public class BufferUtil
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deep copy of a buffer
|
||||||
|
*
|
||||||
|
* @param buffer The buffer to copy
|
||||||
|
* @return A copy of the buffer
|
||||||
|
*/
|
||||||
|
public static ByteBuffer copy(ByteBuffer buffer)
|
||||||
|
{
|
||||||
|
if (buffer == null)
|
||||||
|
return null;
|
||||||
|
int p = buffer.position();
|
||||||
|
ByteBuffer clone = buffer.isDirect() ? ByteBuffer.allocateDirect(buffer.remaining()) : ByteBuffer.allocate(buffer.remaining());
|
||||||
|
clone.put(buffer);
|
||||||
|
clone.flip();
|
||||||
|
buffer.position(p);
|
||||||
|
return clone;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Clear the buffer to be empty in flush mode.
|
* Clear the buffer to be empty in flush mode.
|
||||||
* The position and limit are set to 0;
|
* The position and limit are set to 0;
|
||||||
|
|
|
@ -1,28 +1,57 @@
|
||||||
|
//
|
||||||
|
// ========================================================================
|
||||||
|
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
// All rights reserved. This program and the accompanying materials
|
||||||
|
// are made available under the terms of the Eclipse Public License v1.0
|
||||||
|
// and Apache License v2.0 which accompanies this distribution.
|
||||||
|
//
|
||||||
|
// The Eclipse Public License is available at
|
||||||
|
// http://www.eclipse.org/legal/epl-v10.html
|
||||||
|
//
|
||||||
|
// The Apache License v2.0 is available at
|
||||||
|
// http://www.opensource.org/licenses/apache2.0.php
|
||||||
|
//
|
||||||
|
// You may elect to redistribute this code under either of these licenses.
|
||||||
|
// ========================================================================
|
||||||
|
//
|
||||||
|
|
||||||
package org.eclipse.jetty.websocket.tests.proxy;
|
package org.eclipse.jetty.websocket.tests.proxy;
|
||||||
|
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.util.BufferUtil;
|
||||||
import org.eclipse.jetty.util.FutureCallback;
|
import org.eclipse.jetty.util.FutureCallback;
|
||||||
import org.eclipse.jetty.util.component.LifeCycle;
|
import org.eclipse.jetty.util.StringUtil;
|
||||||
|
import org.eclipse.jetty.util.log.Log;
|
||||||
|
import org.eclipse.jetty.util.log.Logger;
|
||||||
import org.eclipse.jetty.websocket.api.Session;
|
import org.eclipse.jetty.websocket.api.Session;
|
||||||
|
import org.eclipse.jetty.websocket.api.StatusCode;
|
||||||
import org.eclipse.jetty.websocket.api.WebSocketConnectionListener;
|
import org.eclipse.jetty.websocket.api.WebSocketConnectionListener;
|
||||||
import org.eclipse.jetty.websocket.api.WebSocketException;
|
import org.eclipse.jetty.websocket.api.WebSocketException;
|
||||||
import org.eclipse.jetty.websocket.api.WebSocketPartialListener;
|
import org.eclipse.jetty.websocket.api.WebSocketPartialListener;
|
||||||
import org.eclipse.jetty.websocket.api.WebSocketPingPongListener;
|
import org.eclipse.jetty.websocket.api.WebSocketPingPongListener;
|
||||||
|
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
|
||||||
import org.eclipse.jetty.websocket.client.WebSocketClient;
|
import org.eclipse.jetty.websocket.client.WebSocketClient;
|
||||||
|
|
||||||
public class WebSocketProxy
|
public class WebSocketProxy
|
||||||
{
|
{
|
||||||
private final WebSocketClient client = new WebSocketClient();
|
private static final Logger LOG = Log.getLogger(WebSocketProxy.class);
|
||||||
private final URI serverUri = URI.create("ws://echo.websocket.org");
|
|
||||||
|
private final WebSocketClient client;
|
||||||
|
private final URI serverUri;
|
||||||
private final ClientToProxy clientToProxy = new ClientToProxy();
|
private final ClientToProxy clientToProxy = new ClientToProxy();
|
||||||
private final ProxyToServer proxyToServer = new ProxyToServer();
|
private final ProxyToServer proxyToServer = new ProxyToServer();
|
||||||
|
|
||||||
public WebSocketProxy()
|
public WebSocketProxy(WebSocketClient webSocketClient, URI serverUri)
|
||||||
{
|
{
|
||||||
LifeCycle.start(client);
|
this.client = webSocketClient;
|
||||||
|
this.serverUri = serverUri;
|
||||||
}
|
}
|
||||||
|
|
||||||
public WebSocketConnectionListener getWebSocketConnectionListener()
|
public WebSocketConnectionListener getWebSocketConnectionListener()
|
||||||
|
@ -30,33 +59,101 @@ public class WebSocketProxy
|
||||||
return clientToProxy;
|
return clientToProxy;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean awaitClose(long timeout)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (!clientToProxy.closeLatch.await(timeout, TimeUnit.MILLISECONDS))
|
||||||
|
return false;
|
||||||
|
if (proxyToServer.getSession() == null)
|
||||||
|
return true;
|
||||||
|
return proxyToServer.closeLatch.await(timeout, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* We use this to wait until we receive a pong from other websocket connection before sending back the response pong.
|
||||||
|
* This is problematic because the protocol allows unsolicited PongMessages. Ideally it would be best if we could
|
||||||
|
* disable the automatic pong response through something like the {@link org.eclipse.jetty.websocket.api.WebSocketPolicy}.
|
||||||
|
*/
|
||||||
|
private static class PongWait
|
||||||
|
{
|
||||||
|
private final FutureCallback COMPLETED = new FutureCallback(true);
|
||||||
|
private final AtomicReference<FutureCallback> reference = new AtomicReference<>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return gives back a Future which is completed when this is notified that a pong has been received.
|
||||||
|
*/
|
||||||
|
public FutureCallback waitForPong()
|
||||||
|
{
|
||||||
|
FutureCallback futureCallback = new FutureCallback();
|
||||||
|
if (!reference.compareAndSet(null, futureCallback))
|
||||||
|
throw new IllegalStateException();
|
||||||
|
return futureCallback;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true if the pong will be automatically forwarded, otherwise it must be sent manually.
|
||||||
|
*/
|
||||||
|
public boolean receivedPong()
|
||||||
|
{
|
||||||
|
FutureCallback futureCallback = reference.getAndSet(null);
|
||||||
|
if (futureCallback != null)
|
||||||
|
{
|
||||||
|
futureCallback.succeeded();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void cancel()
|
||||||
|
{
|
||||||
|
FutureCallback futureCallback = reference.getAndSet(COMPLETED);
|
||||||
|
if (futureCallback != null)
|
||||||
|
futureCallback.cancel(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public class ClientToProxy implements WebSocketPartialListener, WebSocketPingPongListener
|
public class ClientToProxy implements WebSocketPartialListener, WebSocketPingPongListener
|
||||||
{
|
{
|
||||||
private Session session;
|
private Session session;
|
||||||
private FutureCallback pongWait;
|
private final CountDownLatch closeLatch = new CountDownLatch(1);
|
||||||
|
private final PongWait pongWait = new PongWait();
|
||||||
|
|
||||||
public Session getSession()
|
public Session getSession()
|
||||||
{
|
{
|
||||||
return session;
|
return session;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void receivedPong()
|
public boolean receivedPong()
|
||||||
{
|
{
|
||||||
if (pongWait != null)
|
return pongWait.receivedPong();
|
||||||
{
|
|
||||||
pongWait.succeeded();
|
|
||||||
pongWait = null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void fail(Throwable failure)
|
||||||
|
{
|
||||||
|
session.close(StatusCode.SERVER_ERROR, failure.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onWebSocketConnect(Session session)
|
public void onWebSocketConnect(Session session)
|
||||||
{
|
{
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("{} onWebSocketConnect({})", getClass().getSimpleName(), session);
|
||||||
|
|
||||||
Future<Session> connect = null;
|
Future<Session> connect = null;
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
this.session = session;
|
this.session = session;
|
||||||
connect = client.connect(proxyToServer, serverUri);
|
ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest();
|
||||||
|
upgradeRequest.setSubProtocols(session.getUpgradeRequest().getSubProtocols());
|
||||||
|
upgradeRequest.setExtensions(session.getUpgradeRequest().getExtensions());
|
||||||
|
connect = client.connect(proxyToServer, serverUri, upgradeRequest);
|
||||||
connect.get();
|
connect.get();
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
|
@ -70,6 +167,9 @@ public class WebSocketProxy
|
||||||
@Override
|
@Override
|
||||||
public void onWebSocketPartialBinary(ByteBuffer payload, boolean fin)
|
public void onWebSocketPartialBinary(ByteBuffer payload, boolean fin)
|
||||||
{
|
{
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("{} onWebSocketPartialBinary({}, {})", getClass().getSimpleName(), BufferUtil.toDetailString(payload), fin);
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
proxyToServer.getSession().getRemote().sendPartialBytes(payload, fin);
|
proxyToServer.getSession().getRemote().sendPartialBytes(payload, fin);
|
||||||
|
@ -83,6 +183,9 @@ public class WebSocketProxy
|
||||||
@Override
|
@Override
|
||||||
public void onWebSocketPartialText(String payload, boolean fin)
|
public void onWebSocketPartialText(String payload, boolean fin)
|
||||||
{
|
{
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("{} onWebSocketPartialText({}, {})", getClass().getSimpleName(), StringUtil.truncate(payload, 100), fin);
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
proxyToServer.getSession().getRemote().sendPartialString(payload, fin);
|
proxyToServer.getSession().getRemote().sendPartialString(payload, fin);
|
||||||
|
@ -96,12 +199,15 @@ public class WebSocketProxy
|
||||||
@Override
|
@Override
|
||||||
public void onWebSocketPing(ByteBuffer payload)
|
public void onWebSocketPing(ByteBuffer payload)
|
||||||
{
|
{
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("{} onWebSocketPing({})", getClass().getSimpleName(), BufferUtil.toDetailString(payload));
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
proxyToServer.getSession().getRemote().sendPing(payload);
|
// Block until we get pong response back from server. An automatic pong will be sent after this method.
|
||||||
// Block until we get pong response back from server.
|
FutureCallback futureCallback = pongWait.waitForPong();
|
||||||
// An automatic pong will occur from the implementation after we exit from here.
|
proxyToServer.getSession().getRemote().sendPing(BufferUtil.copy(payload));
|
||||||
pongWait.get();
|
futureCallback.get();
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
{
|
{
|
||||||
|
@ -112,10 +218,16 @@ public class WebSocketProxy
|
||||||
@Override
|
@Override
|
||||||
public void onWebSocketPong(ByteBuffer payload)
|
public void onWebSocketPong(ByteBuffer payload)
|
||||||
{
|
{
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("{} onWebSocketPong({})", getClass().getSimpleName(), BufferUtil.toDetailString(payload));
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
// Notify the other side we have received a Pong.
|
// We do not forward on the pong message unless it was an unsolicited pong.
|
||||||
proxyToServer.receivedPong();
|
// Instead we notify the other side we have received pong which will then unblock in the
|
||||||
|
// thread in onPing() which will trigger the automatic pong response from the implementation.
|
||||||
|
if (!proxyToServer.receivedPong())
|
||||||
|
proxyToServer.session.getRemote().sendPong(BufferUtil.copy(payload));
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
{
|
{
|
||||||
|
@ -126,64 +238,65 @@ public class WebSocketProxy
|
||||||
@Override
|
@Override
|
||||||
public void onWebSocketError(Throwable cause)
|
public void onWebSocketError(Throwable cause)
|
||||||
{
|
{
|
||||||
cause.printStackTrace();
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("{} onWebSocketError()", getClass().getSimpleName(), cause);
|
||||||
|
|
||||||
try
|
proxyToServer.fail(cause);
|
||||||
{
|
pongWait.cancel();
|
||||||
// TODO: need to fail ProxyToServer as well.
|
|
||||||
if (pongWait != null)
|
|
||||||
pongWait.cancel(true);
|
|
||||||
}
|
|
||||||
catch (Exception e)
|
|
||||||
{
|
|
||||||
throw new WebSocketException(e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onWebSocketClose(int statusCode, String reason)
|
public void onWebSocketClose(int statusCode, String reason)
|
||||||
{
|
{
|
||||||
try
|
if (LOG.isDebugEnabled())
|
||||||
{
|
LOG.debug("{} onWebSocketClose({} {})", getClass().getSimpleName(), statusCode, reason);
|
||||||
|
|
||||||
Session session = proxyToServer.getSession();
|
Session session = proxyToServer.getSession();
|
||||||
if (session != null)
|
if (session != null)
|
||||||
session.close(statusCode, reason);
|
session.close(statusCode, reason);
|
||||||
}
|
pongWait.cancel();
|
||||||
catch (Exception e)
|
closeLatch.countDown();
|
||||||
{
|
|
||||||
throw new WebSocketException(e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public class ProxyToServer implements WebSocketPartialListener, WebSocketPingPongListener
|
public class ProxyToServer implements WebSocketPartialListener, WebSocketPingPongListener
|
||||||
{
|
{
|
||||||
private Session session;
|
private Session session;
|
||||||
private FutureCallback pongWait;
|
private final CountDownLatch closeLatch = new CountDownLatch(1);
|
||||||
|
private final PongWait pongWait = new PongWait();
|
||||||
|
|
||||||
public Session getSession()
|
public Session getSession()
|
||||||
{
|
{
|
||||||
return session;
|
return session;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void receivedPong()
|
public boolean receivedPong()
|
||||||
{
|
{
|
||||||
if (pongWait != null)
|
return pongWait.receivedPong();
|
||||||
{
|
|
||||||
pongWait.succeeded();
|
|
||||||
pongWait = null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void fail(Throwable failure)
|
||||||
|
{
|
||||||
|
// Only ProxyToServer can be failed before it is opened (if ClientToProxy fails before the connect completes).
|
||||||
|
if (session != null)
|
||||||
|
session.close(StatusCode.SERVER_ERROR, failure.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onWebSocketConnect(Session session)
|
public void onWebSocketConnect(Session session)
|
||||||
{
|
{
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("{} onWebSocketConnect({})", getClass().getSimpleName(), session);
|
||||||
|
|
||||||
this.session = session;
|
this.session = session;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onWebSocketPartialBinary(ByteBuffer payload, boolean fin)
|
public void onWebSocketPartialBinary(ByteBuffer payload, boolean fin)
|
||||||
{
|
{
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("{} onWebSocketPartialBinary({}, {})", getClass().getSimpleName(), BufferUtil.toDetailString(payload), fin);
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
clientToProxy.getSession().getRemote().sendPartialBytes(payload, fin);
|
clientToProxy.getSession().getRemote().sendPartialBytes(payload, fin);
|
||||||
|
@ -197,6 +310,9 @@ public class WebSocketProxy
|
||||||
@Override
|
@Override
|
||||||
public void onWebSocketPartialText(String payload, boolean fin)
|
public void onWebSocketPartialText(String payload, boolean fin)
|
||||||
{
|
{
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("{} onWebSocketPartialText({}, {})", getClass().getSimpleName(), StringUtil.truncate(payload, 100), fin);
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
clientToProxy.getSession().getRemote().sendPartialString(payload, fin);
|
clientToProxy.getSession().getRemote().sendPartialString(payload, fin);
|
||||||
|
@ -210,12 +326,15 @@ public class WebSocketProxy
|
||||||
@Override
|
@Override
|
||||||
public void onWebSocketPing(ByteBuffer payload)
|
public void onWebSocketPing(ByteBuffer payload)
|
||||||
{
|
{
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("{} onWebSocketPing({})", getClass().getSimpleName(), BufferUtil.toDetailString(payload));
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
clientToProxy.getSession().getRemote().sendPing(payload);
|
// Block until we get pong response back from client. An automatic pong will be sent after this method.
|
||||||
// Block until we get pong response back from client.
|
FutureCallback futureCallback = pongWait.waitForPong();
|
||||||
// An automatic pong will occur from the implementation after we exit from here.
|
clientToProxy.getSession().getRemote().sendPing(BufferUtil.copy(payload));
|
||||||
pongWait.get();
|
futureCallback.get();
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
{
|
{
|
||||||
|
@ -226,10 +345,16 @@ public class WebSocketProxy
|
||||||
@Override
|
@Override
|
||||||
public void onWebSocketPong(ByteBuffer payload)
|
public void onWebSocketPong(ByteBuffer payload)
|
||||||
{
|
{
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("{} onWebSocketPong({})", getClass().getSimpleName(), BufferUtil.toDetailString(payload));
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
// Notify the other side we have received a Pong.
|
// We do not forward on the pong message unless it was an unsolicited pong.
|
||||||
clientToProxy.receivedPong();
|
// Instead we notify the other side we have received pong which will then unblock in the
|
||||||
|
// thread in onPing() which will trigger the automatic pong response from the implementation.
|
||||||
|
if (!clientToProxy.receivedPong())
|
||||||
|
clientToProxy.session.getRemote().sendPong(BufferUtil.copy(payload));
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
{
|
{
|
||||||
|
@ -240,33 +365,24 @@ public class WebSocketProxy
|
||||||
@Override
|
@Override
|
||||||
public void onWebSocketError(Throwable cause)
|
public void onWebSocketError(Throwable cause)
|
||||||
{
|
{
|
||||||
cause.printStackTrace();
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("{} onWebSocketError()", getClass().getSimpleName(), cause);
|
||||||
|
|
||||||
try
|
clientToProxy.fail(cause);
|
||||||
{
|
pongWait.cancel();
|
||||||
// TODO: need to fail ProxyToServer as well.
|
|
||||||
if (pongWait != null)
|
|
||||||
pongWait.cancel(true);
|
|
||||||
}
|
|
||||||
catch (Exception e)
|
|
||||||
{
|
|
||||||
throw new WebSocketException(e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onWebSocketClose(int statusCode, String reason)
|
public void onWebSocketClose(int statusCode, String reason)
|
||||||
{
|
{
|
||||||
try
|
if (LOG.isDebugEnabled())
|
||||||
{
|
LOG.debug("{} onWebSocketClose({} {})", getClass().getSimpleName(), statusCode, reason);
|
||||||
|
|
||||||
Session session = clientToProxy.getSession();
|
Session session = clientToProxy.getSession();
|
||||||
if (session != null)
|
if (session != null)
|
||||||
session.close(statusCode, reason);
|
session.close(statusCode, reason);
|
||||||
}
|
pongWait.cancel();
|
||||||
catch (Exception e)
|
closeLatch.countDown();
|
||||||
{
|
|
||||||
throw new WebSocketException(e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,50 +1,338 @@
|
||||||
|
//
|
||||||
|
// ========================================================================
|
||||||
|
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
// All rights reserved. This program and the accompanying materials
|
||||||
|
// are made available under the terms of the Eclipse Public License v1.0
|
||||||
|
// and Apache License v2.0 which accompanies this distribution.
|
||||||
|
//
|
||||||
|
// The Eclipse Public License is available at
|
||||||
|
// http://www.eclipse.org/legal/epl-v10.html
|
||||||
|
//
|
||||||
|
// The Apache License v2.0 is available at
|
||||||
|
// http://www.opensource.org/licenses/apache2.0.php
|
||||||
|
//
|
||||||
|
// You may elect to redistribute this code under either of these licenses.
|
||||||
|
// ========================================================================
|
||||||
|
//
|
||||||
|
|
||||||
package org.eclipse.jetty.websocket.tests.proxy;
|
package org.eclipse.jetty.websocket.tests.proxy;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.server.HttpChannel;
|
||||||
import org.eclipse.jetty.server.Server;
|
import org.eclipse.jetty.server.Server;
|
||||||
import org.eclipse.jetty.server.ServerConnector;
|
import org.eclipse.jetty.server.ServerConnector;
|
||||||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||||
|
import org.eclipse.jetty.util.BlockingArrayQueue;
|
||||||
|
import org.eclipse.jetty.util.BufferUtil;
|
||||||
|
import org.eclipse.jetty.util.log.StacklessLogging;
|
||||||
|
import org.eclipse.jetty.websocket.api.Session;
|
||||||
|
import org.eclipse.jetty.websocket.api.StatusCode;
|
||||||
|
import org.eclipse.jetty.websocket.api.WebSocketException;
|
||||||
|
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketFrame;
|
||||||
|
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
|
||||||
|
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||||
|
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
|
||||||
|
import org.eclipse.jetty.websocket.client.WebSocketClient;
|
||||||
|
import org.eclipse.jetty.websocket.common.OpCode;
|
||||||
import org.eclipse.jetty.websocket.server.NativeWebSocketServletContainerInitializer;
|
import org.eclipse.jetty.websocket.server.NativeWebSocketServletContainerInitializer;
|
||||||
import org.eclipse.jetty.websocket.server.WebSocketUpgradeFilter;
|
import org.eclipse.jetty.websocket.server.WebSocketUpgradeFilter;
|
||||||
|
import org.eclipse.jetty.websocket.tests.EchoSocket;
|
||||||
|
import org.eclipse.jetty.websocket.tests.EventSocket;
|
||||||
import org.junit.jupiter.api.AfterEach;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
import static org.hamcrest.Matchers.is;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
public class WebSocketProxyTest
|
public class WebSocketProxyTest
|
||||||
{
|
{
|
||||||
|
private static final int PORT = 49998;
|
||||||
|
|
||||||
private Server server;
|
private Server server;
|
||||||
private URI serverUri;
|
private EventSocket serverSocket;
|
||||||
|
private WebSocketProxy webSocketProxy;
|
||||||
|
private WebSocketClient client;
|
||||||
|
private URI proxyUri;
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
public void before() throws Exception
|
public void before() throws Exception
|
||||||
{
|
{
|
||||||
server = new Server();
|
server = new Server();
|
||||||
ServerConnector connector = new ServerConnector(server);
|
ServerConnector connector = new ServerConnector(server);
|
||||||
connector.setPort(8080); // TODO: remove...
|
connector.setPort(PORT);
|
||||||
server.addConnector(connector);
|
server.addConnector(connector);
|
||||||
|
|
||||||
|
client = new WebSocketClient();
|
||||||
|
client.start();
|
||||||
|
proxyUri = URI.create("ws://localhost:" + PORT + "/proxy");
|
||||||
|
URI echoUri = URI.create("ws://localhost:" + PORT + "/echo");
|
||||||
|
webSocketProxy = new WebSocketProxy(client, echoUri);
|
||||||
|
|
||||||
ServletContextHandler contextHandler = new ServletContextHandler();
|
ServletContextHandler contextHandler = new ServletContextHandler();
|
||||||
WebSocketUpgradeFilter.configure(contextHandler);
|
WebSocketUpgradeFilter.configure(contextHandler);
|
||||||
|
serverSocket = new EchoSocket();
|
||||||
NativeWebSocketServletContainerInitializer.configure(contextHandler, ((context, container) ->
|
NativeWebSocketServletContainerInitializer.configure(contextHandler, ((context, container) ->
|
||||||
{
|
{
|
||||||
container.addMapping("/*", (req, resp) -> new WebSocketProxy().getWebSocketConnectionListener());
|
container.addMapping("/proxy", (req, resp) -> webSocketProxy.getWebSocketConnectionListener());
|
||||||
|
container.addMapping("/echo", (req, resp) ->
|
||||||
|
{
|
||||||
|
if (req.hasSubProtocol("fail"))
|
||||||
|
throw new WebSocketException("failing during upgrade");
|
||||||
|
return serverSocket;
|
||||||
|
});
|
||||||
}));
|
}));
|
||||||
|
|
||||||
server.setHandler(contextHandler);
|
server.setHandler(contextHandler);
|
||||||
server.start();
|
server.start();
|
||||||
serverUri = URI.create("ws://localhost:" + connector.getLocalPort());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterEach
|
@AfterEach
|
||||||
public void after() throws Exception
|
public void after() throws Exception
|
||||||
{
|
{
|
||||||
|
client.stop();
|
||||||
server.stop();
|
server.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void test() throws Exception
|
public void testEcho() throws Exception
|
||||||
{
|
{
|
||||||
server.join();
|
EventSocket clientSocket = new EventSocket();
|
||||||
|
client.connect(clientSocket, proxyUri);
|
||||||
|
assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
// Test an echo spread across multiple frames.
|
||||||
|
clientSocket.session.getRemote().sendPartialString("hell", false);
|
||||||
|
clientSocket.session.getRemote().sendPartialString("o w", false);
|
||||||
|
clientSocket.session.getRemote().sendPartialString("orld", false);
|
||||||
|
clientSocket.session.getRemote().sendPartialString("!", true);
|
||||||
|
String response = clientSocket.textMessages.poll(5, TimeUnit.SECONDS);
|
||||||
|
assertThat(response, is("hello world!"));
|
||||||
|
|
||||||
|
// Test we closed successfully on the client side.
|
||||||
|
clientSocket.session.close(StatusCode.NORMAL, "test initiated close");
|
||||||
|
assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
assertThat(clientSocket.closeCode, is(StatusCode.NORMAL));
|
||||||
|
assertThat(clientSocket.closeReason, is("test initiated close"));
|
||||||
|
|
||||||
|
// Test we closed successfully on the server side.
|
||||||
|
assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
assertThat(serverSocket.closeCode, is(StatusCode.NORMAL));
|
||||||
|
assertThat(serverSocket.closeReason, is("test initiated close"));
|
||||||
|
|
||||||
|
// No errors occurred.
|
||||||
|
assertNull(clientSocket.error);
|
||||||
|
assertNull(serverSocket.error);
|
||||||
|
|
||||||
|
// WebSocketProxy has been completely closed.
|
||||||
|
assertTrue(webSocketProxy.awaitClose(5000));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFailServerUpgrade() throws Exception
|
||||||
|
{
|
||||||
|
EventSocket clientSocket = new EventSocket();
|
||||||
|
ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest();
|
||||||
|
upgradeRequest.setSubProtocols("fail");
|
||||||
|
|
||||||
|
try (StacklessLogging ignored = new StacklessLogging(HttpChannel.class))
|
||||||
|
{
|
||||||
|
client.connect(clientSocket, proxyUri, upgradeRequest);
|
||||||
|
assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
}
|
||||||
|
|
||||||
|
// WebSocketProxy has been completely closed.
|
||||||
|
assertTrue(webSocketProxy.awaitClose(5000));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClientError() throws Exception
|
||||||
|
{
|
||||||
|
EventSocket clientSocket = new OnOpenThrowingSocket();
|
||||||
|
client.connect(clientSocket, proxyUri);
|
||||||
|
assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
// Verify expected client close.
|
||||||
|
assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
assertThat(clientSocket.closeCode, is(StatusCode.NO_CLOSE));
|
||||||
|
assertThat(clientSocket.closeReason, is("simulated onOpen error"));
|
||||||
|
assertNotNull(clientSocket.error);
|
||||||
|
|
||||||
|
// Verify expected server close.
|
||||||
|
assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
assertThat(serverSocket.closeCode, is(StatusCode.NO_CLOSE));
|
||||||
|
assertThat(serverSocket.closeReason, is("Disconnected"));
|
||||||
|
assertNull(serverSocket.error);
|
||||||
|
|
||||||
|
// WebSocketProxy has been completely closed.
|
||||||
|
assertTrue(webSocketProxy.awaitClose(5000));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testServerError() throws Exception
|
||||||
|
{
|
||||||
|
serverSocket = new OnOpenThrowingSocket();
|
||||||
|
|
||||||
|
EventSocket clientSocket = new EventSocket();
|
||||||
|
client.connect(clientSocket, proxyUri);
|
||||||
|
assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
// Verify expected client close.
|
||||||
|
assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
assertThat(clientSocket.closeCode, is(StatusCode.SERVER_ERROR));
|
||||||
|
assertThat(clientSocket.closeReason, is("simulated onOpen error"));
|
||||||
|
assertNull(clientSocket.error);
|
||||||
|
|
||||||
|
// Verify expected server close.
|
||||||
|
assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
assertThat(serverSocket.closeCode, is(StatusCode.SERVER_ERROR));
|
||||||
|
assertThat(serverSocket.closeReason, is("simulated onOpen error"));
|
||||||
|
assertNotNull(serverSocket.error);
|
||||||
|
|
||||||
|
// WebSocketProxy has been completely closed.
|
||||||
|
assertTrue(webSocketProxy.awaitClose(5000));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testServerErrorClientNoResponse() throws Exception
|
||||||
|
{
|
||||||
|
serverSocket = new OnTextThrowingSocket();
|
||||||
|
|
||||||
|
EventSocket clientSocket = new EventSocket();
|
||||||
|
client.connect(clientSocket, proxyUri);
|
||||||
|
assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
assertTrue(serverSocket.openLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
clientSocket.session.getRemote().sendString("hello world!");
|
||||||
|
|
||||||
|
// Verify expected client close.
|
||||||
|
assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
assertThat(clientSocket.closeCode, is(StatusCode.SERVER_ERROR));
|
||||||
|
assertThat(clientSocket.closeReason, is("simulated onMessage error"));
|
||||||
|
assertNull(clientSocket.error);
|
||||||
|
|
||||||
|
// Verify expected server close.
|
||||||
|
assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
assertThat(serverSocket.closeCode, is(StatusCode.SERVER_ERROR));
|
||||||
|
assertThat(serverSocket.closeReason, is("simulated onMessage error"));
|
||||||
|
assertNotNull(serverSocket.error);
|
||||||
|
|
||||||
|
assertNull(clientSocket.textMessages.poll(1, TimeUnit.SECONDS));
|
||||||
|
assertTrue(webSocketProxy.awaitClose(5000));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPingPong() throws Exception
|
||||||
|
{
|
||||||
|
PingPongSocket serverEndpoint = new PingPongSocket();
|
||||||
|
serverSocket = serverEndpoint;
|
||||||
|
|
||||||
|
PingPongSocket clientSocket = new PingPongSocket();
|
||||||
|
client.connect(clientSocket, proxyUri);
|
||||||
|
assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
assertTrue(serverSocket.openLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
// Test unsolicited pong from client.
|
||||||
|
clientSocket.session.getRemote().sendPong(BufferUtil.toBuffer("unsolicited pong from client"));
|
||||||
|
assertThat(serverEndpoint.pingMessages.size(), is(0));
|
||||||
|
assertThat(serverEndpoint.pongMessages.poll(5, TimeUnit.SECONDS), is(BufferUtil.toBuffer("unsolicited pong from client")));
|
||||||
|
|
||||||
|
// Test unsolicited pong from server.
|
||||||
|
serverEndpoint.session.getRemote().sendPong(BufferUtil.toBuffer("unsolicited pong from server"));
|
||||||
|
assertThat(clientSocket.pingMessages.size(), is(0));
|
||||||
|
assertThat(clientSocket.pongMessages.poll(5, TimeUnit.SECONDS), is(BufferUtil.toBuffer("unsolicited pong from server")));
|
||||||
|
|
||||||
|
// Test pings from client.
|
||||||
|
for (int i = 0; i < 10; i++)
|
||||||
|
clientSocket.session.getRemote().sendPing(BufferUtil.toBuffer(i));
|
||||||
|
for (int i = 0; i < 10; i++)
|
||||||
|
{
|
||||||
|
assertThat(serverEndpoint.pingMessages.poll(5, TimeUnit.SECONDS), is(BufferUtil.toBuffer(i)));
|
||||||
|
assertThat(clientSocket.pongMessages.poll(5, TimeUnit.SECONDS), is(BufferUtil.toBuffer(i)));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test pings from server.
|
||||||
|
for (int i = 0; i < 10; i++)
|
||||||
|
serverEndpoint.session.getRemote().sendPing(BufferUtil.toBuffer(i));
|
||||||
|
for (int i = 0; i < 10; i++)
|
||||||
|
{
|
||||||
|
assertThat(clientSocket.pingMessages.poll(5, TimeUnit.SECONDS), is(BufferUtil.toBuffer(i)));
|
||||||
|
assertThat(serverEndpoint.pongMessages.poll(5, TimeUnit.SECONDS), is(BufferUtil.toBuffer(i)));
|
||||||
|
}
|
||||||
|
|
||||||
|
clientSocket.session.close(StatusCode.NORMAL, "closing from test");
|
||||||
|
|
||||||
|
// Verify expected client close.
|
||||||
|
assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
assertThat(clientSocket.closeCode, is(StatusCode.NORMAL));
|
||||||
|
assertThat(clientSocket.closeReason, is("closing from test"));
|
||||||
|
assertNull(clientSocket.error);
|
||||||
|
|
||||||
|
// Verify expected server close.
|
||||||
|
assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
assertThat(serverSocket.closeCode, is(StatusCode.NORMAL));
|
||||||
|
assertThat(serverSocket.closeReason, is("closing from test"));
|
||||||
|
assertNull(serverSocket.error);
|
||||||
|
|
||||||
|
// WebSocketProxy has been completely closed.
|
||||||
|
assertTrue(webSocketProxy.awaitClose(5000));
|
||||||
|
|
||||||
|
// Check we had no unexpected pings or pongs sent.
|
||||||
|
assertThat(clientSocket.pingMessages.size(), is(0));
|
||||||
|
assertThat(serverEndpoint.pingMessages.size(), is(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
@WebSocket
|
||||||
|
public static class PingPongSocket extends EventSocket
|
||||||
|
{
|
||||||
|
public BlockingQueue<ByteBuffer> pingMessages = new BlockingArrayQueue<>();
|
||||||
|
public BlockingQueue<ByteBuffer> pongMessages = new BlockingArrayQueue<>();
|
||||||
|
|
||||||
|
@OnWebSocketFrame
|
||||||
|
public void onWebSocketFrame(Frame frame)
|
||||||
|
{
|
||||||
|
switch (frame.getOpCode())
|
||||||
|
{
|
||||||
|
case OpCode.PING:
|
||||||
|
pingMessages.add(BufferUtil.copy(frame.getPayload()));
|
||||||
|
break;
|
||||||
|
case OpCode.PONG:
|
||||||
|
pongMessages.add(BufferUtil.copy(frame.getPayload()));
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@WebSocket
|
||||||
|
public static class OnOpenThrowingSocket extends EventSocket
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onOpen(Session session)
|
||||||
|
{
|
||||||
|
super.onOpen(session);
|
||||||
|
throw new IllegalStateException("simulated onOpen error");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@WebSocket
|
||||||
|
public static class OnTextThrowingSocket extends EventSocket
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onMessage(String message) throws IOException
|
||||||
|
{
|
||||||
|
super.onMessage(message);
|
||||||
|
throw new IllegalStateException("simulated onMessage error");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,6 +12,7 @@ org.eclipse.jetty.LEVEL=INFO
|
||||||
# org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.LEVEL=DEBUG
|
# org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.LEVEL=DEBUG
|
||||||
# org.eclipse.jetty.websocket.common.io.IOState.LEVEL=DEBUG
|
# org.eclipse.jetty.websocket.common.io.IOState.LEVEL=DEBUG
|
||||||
# org.eclipse.jetty.websocket.common.test.LEVEL=DEBUG
|
# org.eclipse.jetty.websocket.common.test.LEVEL=DEBUG
|
||||||
|
# org.eclipse.jetty.websocket.tests.proxy.LEVEL=DEBUG
|
||||||
|
|
||||||
# org.eclipse.jetty.websocket.common.Generator.LEVEL=DEBUG
|
# org.eclipse.jetty.websocket.common.Generator.LEVEL=DEBUG
|
||||||
# org.eclipse.jetty.websocket.common.Parser.LEVEL=DEBUG
|
# org.eclipse.jetty.websocket.common.Parser.LEVEL=DEBUG
|
||||||
|
|
Loading…
Reference in New Issue