Issue #3170 - cleanups of WebSocketProxy and WebSocketProxyTest
Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
parent
327783e1ce
commit
e007634e53
|
@ -0,0 +1,56 @@
|
||||||
|
//
|
||||||
|
// ========================================================================
|
||||||
|
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
// All rights reserved. This program and the accompanying materials
|
||||||
|
// are made available under the terms of the Eclipse Public License v1.0
|
||||||
|
// and Apache License v2.0 which accompanies this distribution.
|
||||||
|
//
|
||||||
|
// The Eclipse Public License is available at
|
||||||
|
// http://www.eclipse.org/legal/epl-v10.html
|
||||||
|
//
|
||||||
|
// The Apache License v2.0 is available at
|
||||||
|
// http://www.opensource.org/licenses/apache2.0.php
|
||||||
|
//
|
||||||
|
// You may elect to redistribute this code under either of these licenses.
|
||||||
|
// ========================================================================
|
||||||
|
//
|
||||||
|
|
||||||
|
package org.eclipse.jetty.websocket.core;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.util.Callback;
|
||||||
|
|
||||||
|
public class EchoFrameHandler extends TestAsyncFrameHandler
|
||||||
|
{
|
||||||
|
private boolean throwOnFrame;
|
||||||
|
|
||||||
|
public void throwOnFrame()
|
||||||
|
{
|
||||||
|
throwOnFrame = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public EchoFrameHandler(String name)
|
||||||
|
{
|
||||||
|
super(name);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFrame(Frame frame, Callback callback)
|
||||||
|
{
|
||||||
|
LOG.info("[{}] onFrame {}", name, frame);
|
||||||
|
receivedFrames.offer(Frame.copy(frame));
|
||||||
|
|
||||||
|
if (throwOnFrame)
|
||||||
|
throw new RuntimeException("intentionally throwing in server onFrame()");
|
||||||
|
|
||||||
|
if (frame.isDataFrame())
|
||||||
|
{
|
||||||
|
LOG.info("[{}] echoDataFrame {}", name, frame);
|
||||||
|
coreSession.sendFrame(new Frame(frame.getOpCode(), frame.getPayload()), callback, false);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
callback.succeeded();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,107 @@
|
||||||
|
//
|
||||||
|
// ========================================================================
|
||||||
|
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
// All rights reserved. This program and the accompanying materials
|
||||||
|
// are made available under the terms of the Eclipse Public License v1.0
|
||||||
|
// and Apache License v2.0 which accompanies this distribution.
|
||||||
|
//
|
||||||
|
// The Eclipse Public License is available at
|
||||||
|
// http://www.eclipse.org/legal/epl-v10.html
|
||||||
|
//
|
||||||
|
// The Apache License v2.0 is available at
|
||||||
|
// http://www.opensource.org/licenses/apache2.0.php
|
||||||
|
//
|
||||||
|
// You may elect to redistribute this code under either of these licenses.
|
||||||
|
// ========================================================================
|
||||||
|
//
|
||||||
|
|
||||||
|
package org.eclipse.jetty.websocket.core;
|
||||||
|
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.util.BlockingArrayQueue;
|
||||||
|
import org.eclipse.jetty.util.Callback;
|
||||||
|
import org.eclipse.jetty.util.log.Log;
|
||||||
|
import org.eclipse.jetty.util.log.Logger;
|
||||||
|
|
||||||
|
public class TestAsyncFrameHandler implements FrameHandler
|
||||||
|
{
|
||||||
|
protected static final Logger LOG = Log.getLogger(TestAsyncFrameHandler.class);
|
||||||
|
protected final String name;
|
||||||
|
|
||||||
|
public CoreSession coreSession;
|
||||||
|
public BlockingQueue<Frame> receivedFrames = new BlockingArrayQueue<>();
|
||||||
|
public volatile Throwable error;
|
||||||
|
public CountDownLatch openLatch = new CountDownLatch(1);
|
||||||
|
public CountDownLatch errorLatch = new CountDownLatch(1);
|
||||||
|
public CountDownLatch closeLatch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
public TestAsyncFrameHandler()
|
||||||
|
{
|
||||||
|
name = TestAsyncFrameHandler.class.getSimpleName();
|
||||||
|
}
|
||||||
|
|
||||||
|
public TestAsyncFrameHandler(String name)
|
||||||
|
{
|
||||||
|
this.name = name;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onOpen(CoreSession coreSession, Callback callback)
|
||||||
|
{
|
||||||
|
LOG.info("[{}] onOpen {}", name, coreSession);
|
||||||
|
this.coreSession = coreSession;
|
||||||
|
callback.succeeded();
|
||||||
|
openLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFrame(Frame frame, Callback callback)
|
||||||
|
{
|
||||||
|
LOG.info("[{}] onFrame {}", name, frame);
|
||||||
|
receivedFrames.offer(Frame.copy(frame));
|
||||||
|
callback.succeeded();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onClosed(CloseStatus closeStatus, Callback callback)
|
||||||
|
{
|
||||||
|
LOG.info("[{}] onClosed {}", name, closeStatus);
|
||||||
|
closeLatch.countDown();
|
||||||
|
callback.succeeded();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(Throwable cause, Callback callback)
|
||||||
|
{
|
||||||
|
LOG.info("[{}] onError {} ", name, cause == null?null:cause.toString());
|
||||||
|
error = cause;
|
||||||
|
errorLatch.countDown();
|
||||||
|
callback.succeeded();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void sendText(String text)
|
||||||
|
{
|
||||||
|
LOG.info("[{}] sendText {} ", name, text);
|
||||||
|
Frame frame = new Frame(OpCode.TEXT, text);
|
||||||
|
coreSession.sendFrame(frame, Callback.NOOP, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void sendFrame(Frame frame)
|
||||||
|
{
|
||||||
|
LOG.info("[{}] sendFrame {} ", name, frame);
|
||||||
|
coreSession.sendFrame(frame, Callback.NOOP, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void close()
|
||||||
|
{
|
||||||
|
close(CloseStatus.NORMAL, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void close(int closeStatus, String reason)
|
||||||
|
{
|
||||||
|
sendFrame(CloseStatus.toFrame(closeStatus, reason));
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,141 +0,0 @@
|
||||||
//
|
|
||||||
// ========================================================================
|
|
||||||
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
|
||||||
// ------------------------------------------------------------------------
|
|
||||||
// All rights reserved. This program and the accompanying materials
|
|
||||||
// are made available under the terms of the Eclipse Public License v1.0
|
|
||||||
// and Apache License v2.0 which accompanies this distribution.
|
|
||||||
//
|
|
||||||
// The Eclipse Public License is available at
|
|
||||||
// http://www.eclipse.org/legal/epl-v10.html
|
|
||||||
//
|
|
||||||
// The Apache License v2.0 is available at
|
|
||||||
// http://www.opensource.org/licenses/apache2.0.php
|
|
||||||
//
|
|
||||||
// You may elect to redistribute this code under either of these licenses.
|
|
||||||
// ========================================================================
|
|
||||||
//
|
|
||||||
|
|
||||||
package org.eclipse.jetty.websocket.core.proxy;
|
|
||||||
|
|
||||||
import java.util.concurrent.BlockingQueue;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.TimeoutException;
|
|
||||||
|
|
||||||
import org.eclipse.jetty.util.BlockingArrayQueue;
|
|
||||||
import org.eclipse.jetty.util.BufferUtil;
|
|
||||||
import org.eclipse.jetty.util.Callback;
|
|
||||||
import org.eclipse.jetty.websocket.core.CloseStatus;
|
|
||||||
import org.eclipse.jetty.websocket.core.Frame;
|
|
||||||
import org.eclipse.jetty.websocket.core.FrameHandler;
|
|
||||||
import org.eclipse.jetty.websocket.core.OpCode;
|
|
||||||
|
|
||||||
class BasicFrameHandler implements FrameHandler
|
|
||||||
{
|
|
||||||
protected String name;
|
|
||||||
protected CoreSession session;
|
|
||||||
protected CountDownLatch opened = new CountDownLatch(1);
|
|
||||||
protected CountDownLatch closed = new CountDownLatch(1);
|
|
||||||
|
|
||||||
protected BlockingQueue<Frame> receivedFrames = new BlockingArrayQueue<>();
|
|
||||||
|
|
||||||
|
|
||||||
public BasicFrameHandler(String name)
|
|
||||||
{
|
|
||||||
this.name = "[" + name + "]";
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onOpen(CoreSession coreSession, Callback callback)
|
|
||||||
{
|
|
||||||
session = coreSession;
|
|
||||||
System.err.println(name + " onOpen(): " + session);
|
|
||||||
opened.countDown();
|
|
||||||
callback.succeeded();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFrame(Frame frame, Callback callback)
|
|
||||||
{
|
|
||||||
System.err.println(name + " onFrame(): " + frame);
|
|
||||||
receivedFrames.offer(Frame.copy(frame));
|
|
||||||
callback.succeeded();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onError(Throwable cause, Callback callback)
|
|
||||||
{
|
|
||||||
System.err.println(name + " onError(): " + cause);
|
|
||||||
callback.succeeded();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onClosed(CloseStatus closeStatus, Callback callback)
|
|
||||||
{
|
|
||||||
System.err.println(name + " onClosed(): " + closeStatus);
|
|
||||||
closed.countDown();
|
|
||||||
callback.succeeded();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void sendText(String message)
|
|
||||||
{
|
|
||||||
System.err.println(name + " sendText(): " + message);
|
|
||||||
Frame textFrame = new Frame(OpCode.TEXT, BufferUtil.toBuffer(message));
|
|
||||||
session.sendFrame(textFrame, Callback.NOOP, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void sendFrame(Frame frame)
|
|
||||||
{
|
|
||||||
System.err.println(name + " sendFrame(): " + frame);
|
|
||||||
session.sendFrame(frame, Callback.NOOP, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void close(String message) throws Exception
|
|
||||||
{
|
|
||||||
session.close(CloseStatus.NORMAL, message, Callback.NOOP);
|
|
||||||
awaitClose();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void awaitClose() throws Exception
|
|
||||||
{
|
|
||||||
if (!closed.await(5, TimeUnit.SECONDS))
|
|
||||||
throw new TimeoutException();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public static class ServerEchoHandler extends BasicFrameHandler
|
|
||||||
{
|
|
||||||
private boolean throwOnFrame;
|
|
||||||
|
|
||||||
public void throwOnFrame()
|
|
||||||
{
|
|
||||||
throwOnFrame = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
public ServerEchoHandler(String name)
|
|
||||||
{
|
|
||||||
super(name);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFrame(Frame frame, Callback callback)
|
|
||||||
{
|
|
||||||
System.err.println(name + " onFrame(): " + frame);
|
|
||||||
receivedFrames.offer(Frame.copy(frame));
|
|
||||||
|
|
||||||
if (throwOnFrame)
|
|
||||||
throw new RuntimeException("intentionally throwing in server onFrame()");
|
|
||||||
|
|
||||||
if (frame.isDataFrame())
|
|
||||||
{
|
|
||||||
System.err.println(name + " echoDataFrame(): " + frame);
|
|
||||||
session.sendFrame(new Frame(frame.getOpCode(), frame.getPayload()), callback, false);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
callback.succeeded();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -25,6 +25,8 @@ import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
import org.eclipse.jetty.util.BlockingArrayQueue;
|
import org.eclipse.jetty.util.BlockingArrayQueue;
|
||||||
import org.eclipse.jetty.util.Callback;
|
import org.eclipse.jetty.util.Callback;
|
||||||
|
import org.eclipse.jetty.util.log.Log;
|
||||||
|
import org.eclipse.jetty.util.log.Logger;
|
||||||
import org.eclipse.jetty.websocket.core.CloseStatus;
|
import org.eclipse.jetty.websocket.core.CloseStatus;
|
||||||
import org.eclipse.jetty.websocket.core.Frame;
|
import org.eclipse.jetty.websocket.core.Frame;
|
||||||
import org.eclipse.jetty.websocket.core.FrameHandler;
|
import org.eclipse.jetty.websocket.core.FrameHandler;
|
||||||
|
@ -33,6 +35,8 @@ import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
|
||||||
|
|
||||||
class WebSocketProxy
|
class WebSocketProxy
|
||||||
{
|
{
|
||||||
|
protected static final Logger LOG = Log.getLogger(WebSocketProxy.class);
|
||||||
|
|
||||||
enum State
|
enum State
|
||||||
{
|
{
|
||||||
NOT_OPEN,
|
NOT_OPEN,
|
||||||
|
@ -45,8 +49,7 @@ class WebSocketProxy
|
||||||
}
|
}
|
||||||
|
|
||||||
private final Object lock = new Object();
|
private final Object lock = new Object();
|
||||||
|
private WebSocketCoreClient client;
|
||||||
WebSocketCoreClient client;
|
|
||||||
private URI serverUri;
|
private URI serverUri;
|
||||||
|
|
||||||
public Client2Proxy client2Proxy = new Client2Proxy();
|
public Client2Proxy client2Proxy = new Client2Proxy();
|
||||||
|
@ -57,6 +60,7 @@ class WebSocketProxy
|
||||||
this.client = client;
|
this.client = client;
|
||||||
this.serverUri = serverUri;
|
this.serverUri = serverUri;
|
||||||
}
|
}
|
||||||
|
|
||||||
class Client2Proxy implements FrameHandler
|
class Client2Proxy implements FrameHandler
|
||||||
{
|
{
|
||||||
private CoreSession client;
|
private CoreSession client;
|
||||||
|
@ -77,9 +81,10 @@ class WebSocketProxy
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onOpen(CoreSession session, Callback callback)
|
public void onOpen(CoreSession coreSession, Callback callback)
|
||||||
{
|
{
|
||||||
System.err.println(toString() + " onOpen(): " + session);
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("[{}] onOpen {}", toString(), coreSession);
|
||||||
|
|
||||||
Throwable failure = null;
|
Throwable failure = null;
|
||||||
synchronized (lock)
|
synchronized (lock)
|
||||||
|
@ -88,7 +93,7 @@ class WebSocketProxy
|
||||||
{
|
{
|
||||||
case NOT_OPEN:
|
case NOT_OPEN:
|
||||||
state = State.CONNECTING;
|
state = State.CONNECTING;
|
||||||
client = session;
|
client = coreSession;
|
||||||
break;
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
|
@ -105,7 +110,8 @@ class WebSocketProxy
|
||||||
|
|
||||||
private void onOpenSuccess(Callback callback)
|
private void onOpenSuccess(Callback callback)
|
||||||
{
|
{
|
||||||
System.err.println(toString() + " onOpenSuccess()");
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("[{}] onOpenSuccess", toString());
|
||||||
|
|
||||||
Throwable failure = null;
|
Throwable failure = null;
|
||||||
synchronized (lock)
|
synchronized (lock)
|
||||||
|
@ -134,7 +140,8 @@ class WebSocketProxy
|
||||||
|
|
||||||
private void onOpenFail(Callback callback, Throwable t)
|
private void onOpenFail(Callback callback, Throwable t)
|
||||||
{
|
{
|
||||||
System.err.println(toString() + " onOpenFail(): " + t);
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("[{}] onOpenFail {}", toString(), t);
|
||||||
|
|
||||||
Throwable failure = t;
|
Throwable failure = t;
|
||||||
synchronized (lock)
|
synchronized (lock)
|
||||||
|
@ -163,7 +170,8 @@ class WebSocketProxy
|
||||||
@Override
|
@Override
|
||||||
public void onFrame(Frame frame, Callback callback)
|
public void onFrame(Frame frame, Callback callback)
|
||||||
{
|
{
|
||||||
System.err.println(toString() + " onFrame(): " + frame);
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("[{}] onFrame {}", toString(), frame);
|
||||||
receivedFrames.offer(Frame.copy(frame));
|
receivedFrames.offer(Frame.copy(frame));
|
||||||
|
|
||||||
Callback sendCallback = callback;
|
Callback sendCallback = callback;
|
||||||
|
@ -207,7 +215,8 @@ class WebSocketProxy
|
||||||
@Override
|
@Override
|
||||||
public void onError(Throwable failure, Callback callback)
|
public void onError(Throwable failure, Callback callback)
|
||||||
{
|
{
|
||||||
System.err.println(toString() + " onError(): " + failure);
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("[{}] onError {}", toString(), failure);
|
||||||
|
|
||||||
boolean failServer2Proxy;
|
boolean failServer2Proxy;
|
||||||
synchronized (lock)
|
synchronized (lock)
|
||||||
|
@ -235,7 +244,8 @@ class WebSocketProxy
|
||||||
|
|
||||||
public void fail(Throwable failure, Callback callback)
|
public void fail(Throwable failure, Callback callback)
|
||||||
{
|
{
|
||||||
System.err.println(toString() + " fail(): " + failure);
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("[{}] fail {}", toString(), failure);
|
||||||
|
|
||||||
Callback sendCallback = null;
|
Callback sendCallback = null;
|
||||||
synchronized (lock)
|
synchronized (lock)
|
||||||
|
@ -268,14 +278,17 @@ class WebSocketProxy
|
||||||
@Override
|
@Override
|
||||||
public void onClosed(CloseStatus closeStatus, Callback callback)
|
public void onClosed(CloseStatus closeStatus, Callback callback)
|
||||||
{
|
{
|
||||||
System.err.println(toString() + " onClosed(): " + closeStatus);
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("[{}] onClosed {}", toString(), closeStatus);
|
||||||
|
|
||||||
closed.countDown();
|
closed.countDown();
|
||||||
callback.succeeded();
|
callback.succeeded();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void send(Frame frame, Callback callback)
|
public void send(Frame frame, Callback callback)
|
||||||
{
|
{
|
||||||
System.err.println(toString() + " send(): " + frame);
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("[{}] send {}", toString(), frame);
|
||||||
|
|
||||||
Callback sendCallback = callback;
|
Callback sendCallback = callback;
|
||||||
Throwable failure = null;
|
Throwable failure = null;
|
||||||
|
@ -315,10 +328,7 @@ class WebSocketProxy
|
||||||
@Override
|
@Override
|
||||||
public String toString()
|
public String toString()
|
||||||
{
|
{
|
||||||
synchronized (lock)
|
return "Client2Proxy:" + getState();
|
||||||
{
|
|
||||||
return "[Client2Proxy," + state + "] ";
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -343,7 +353,8 @@ class WebSocketProxy
|
||||||
|
|
||||||
public void connect(Callback callback)
|
public void connect(Callback callback)
|
||||||
{
|
{
|
||||||
System.err.println(toString() + " connect()");
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("[{}] connect", toString());
|
||||||
|
|
||||||
Throwable failure = null;
|
Throwable failure = null;
|
||||||
synchronized (lock)
|
synchronized (lock)
|
||||||
|
@ -386,9 +397,10 @@ class WebSocketProxy
|
||||||
callback.failed(failure);
|
callback.failed(failure);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void onConnectSuccess(CoreSession s, Callback callback)
|
private void onConnectSuccess(CoreSession coreSession, Callback callback)
|
||||||
{
|
{
|
||||||
System.err.println(toString() + " onConnectSuccess(): " + s);
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("[{}] onConnectSuccess {}", toString(), coreSession);
|
||||||
|
|
||||||
Throwable failure = null;
|
Throwable failure = null;
|
||||||
synchronized (lock)
|
synchronized (lock)
|
||||||
|
@ -411,14 +423,15 @@ class WebSocketProxy
|
||||||
}
|
}
|
||||||
|
|
||||||
if (failure != null)
|
if (failure != null)
|
||||||
s.close(CloseStatus.SHUTDOWN, failure.getMessage(), Callback.from(callback, failure));
|
coreSession.close(CloseStatus.SHUTDOWN, failure.getMessage(), Callback.from(callback, failure));
|
||||||
else
|
else
|
||||||
callback.succeeded();
|
callback.succeeded();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void onConnectFailure(Throwable t, Callback callback)
|
private void onConnectFailure(Throwable t, Callback callback)
|
||||||
{
|
{
|
||||||
System.err.println(toString() + " onConnectFailure(): " + t);
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("[{}] onConnectFailure {}", toString(), t);
|
||||||
|
|
||||||
Throwable failure = t;
|
Throwable failure = t;
|
||||||
synchronized (lock)
|
synchronized (lock)
|
||||||
|
@ -446,9 +459,10 @@ class WebSocketProxy
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onOpen(CoreSession session, Callback callback)
|
public void onOpen(CoreSession coreSession, Callback callback)
|
||||||
{
|
{
|
||||||
System.err.println(toString() + " onOpen(): " + session);
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("[{}] onOpen {}", toString(), coreSession);
|
||||||
|
|
||||||
Throwable failure = null;
|
Throwable failure = null;
|
||||||
synchronized (lock)
|
synchronized (lock)
|
||||||
|
@ -457,7 +471,7 @@ class WebSocketProxy
|
||||||
{
|
{
|
||||||
case CONNECTING:
|
case CONNECTING:
|
||||||
state = State.OPEN;
|
state = State.OPEN;
|
||||||
server = session;
|
server = coreSession;
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case FAILED:
|
case FAILED:
|
||||||
|
@ -479,7 +493,8 @@ class WebSocketProxy
|
||||||
@Override
|
@Override
|
||||||
public void onFrame(Frame frame, Callback callback)
|
public void onFrame(Frame frame, Callback callback)
|
||||||
{
|
{
|
||||||
System.err.println(toString() + " onFrame(): " + frame);
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("[{}] onFrame {}", toString(), frame);
|
||||||
receivedFrames.offer(Frame.copy(frame));
|
receivedFrames.offer(Frame.copy(frame));
|
||||||
|
|
||||||
Callback sendCallback = callback;
|
Callback sendCallback = callback;
|
||||||
|
@ -522,7 +537,8 @@ class WebSocketProxy
|
||||||
@Override
|
@Override
|
||||||
public void onError(Throwable failure, Callback callback)
|
public void onError(Throwable failure, Callback callback)
|
||||||
{
|
{
|
||||||
System.err.println(toString() + " onError(): " + failure);
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("[{}] onError {}", toString(), failure);
|
||||||
|
|
||||||
boolean failClient2Proxy = false;
|
boolean failClient2Proxy = false;
|
||||||
synchronized (lock)
|
synchronized (lock)
|
||||||
|
@ -550,14 +566,16 @@ class WebSocketProxy
|
||||||
@Override
|
@Override
|
||||||
public void onClosed(CloseStatus closeStatus, Callback callback)
|
public void onClosed(CloseStatus closeStatus, Callback callback)
|
||||||
{
|
{
|
||||||
System.err.println(toString() + " onClosed(): " + closeStatus);
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("[{}] onClosed {}", toString(), closeStatus);
|
||||||
closed.countDown();
|
closed.countDown();
|
||||||
callback.succeeded();
|
callback.succeeded();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void fail(Throwable failure, Callback callback)
|
public void fail(Throwable failure, Callback callback)
|
||||||
{
|
{
|
||||||
System.err.println(toString() + " fail(): " + failure);
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("[{}] fail {}", toString(), failure);
|
||||||
|
|
||||||
Callback sendCallback = null;
|
Callback sendCallback = null;
|
||||||
synchronized (lock)
|
synchronized (lock)
|
||||||
|
@ -588,7 +606,8 @@ class WebSocketProxy
|
||||||
|
|
||||||
public void send(Frame frame, Callback callback)
|
public void send(Frame frame, Callback callback)
|
||||||
{
|
{
|
||||||
System.err.println(toString() + " send(): " + frame);
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("[{}] send {}", toString(), frame);
|
||||||
|
|
||||||
Callback sendCallback = callback;
|
Callback sendCallback = callback;
|
||||||
Throwable failure = null;
|
Throwable failure = null;
|
||||||
|
@ -628,10 +647,7 @@ class WebSocketProxy
|
||||||
@Override
|
@Override
|
||||||
public String toString()
|
public String toString()
|
||||||
{
|
{
|
||||||
synchronized (lock)
|
return "Server2Proxy:" + getState();
|
||||||
{
|
|
||||||
return "[Server2Proxy," + state + "] ";
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -40,13 +40,15 @@ import org.eclipse.jetty.server.handler.HandlerList;
|
||||||
import org.eclipse.jetty.util.Callback;
|
import org.eclipse.jetty.util.Callback;
|
||||||
import org.eclipse.jetty.util.log.StacklessLogging;
|
import org.eclipse.jetty.util.log.StacklessLogging;
|
||||||
import org.eclipse.jetty.websocket.core.CloseStatus;
|
import org.eclipse.jetty.websocket.core.CloseStatus;
|
||||||
|
import org.eclipse.jetty.websocket.core.EchoFrameHandler;
|
||||||
import org.eclipse.jetty.websocket.core.Frame;
|
import org.eclipse.jetty.websocket.core.Frame;
|
||||||
import org.eclipse.jetty.websocket.core.FrameHandler;
|
import org.eclipse.jetty.websocket.core.FrameHandler;
|
||||||
import org.eclipse.jetty.websocket.core.FrameHandler.CoreSession;
|
import org.eclipse.jetty.websocket.core.FrameHandler.CoreSession;
|
||||||
import org.eclipse.jetty.websocket.core.OpCode;
|
import org.eclipse.jetty.websocket.core.OpCode;
|
||||||
|
import org.eclipse.jetty.websocket.core.TestAsyncFrameHandler;
|
||||||
import org.eclipse.jetty.websocket.core.client.ClientUpgradeRequest;
|
import org.eclipse.jetty.websocket.core.client.ClientUpgradeRequest;
|
||||||
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
|
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
|
||||||
import org.eclipse.jetty.websocket.core.internal.WebSocketChannel;
|
import org.eclipse.jetty.websocket.core.internal.WebSocketCoreSession;
|
||||||
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
|
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
|
||||||
import org.eclipse.jetty.websocket.core.server.WebSocketUpgradeHandler;
|
import org.eclipse.jetty.websocket.core.server.WebSocketUpgradeHandler;
|
||||||
import org.junit.jupiter.api.AfterEach;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
|
@ -59,14 +61,16 @@ import static org.hamcrest.Matchers.is;
|
||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
public class WebSocketProxyTest
|
public class WebSocketProxyTest
|
||||||
{
|
{
|
||||||
private Server _server;
|
private Server _server;
|
||||||
private WebSocketCoreClient _client;
|
private WebSocketCoreClient _client;
|
||||||
private WebSocketProxy proxy;
|
private WebSocketProxy proxy;
|
||||||
private BasicFrameHandler.ServerEchoHandler serverFrameHandler;
|
private EchoFrameHandler serverFrameHandler;
|
||||||
private TestHandler testHandler;
|
private TestHandler testHandler;
|
||||||
|
FrameHandler.ConfigurationCustomizer defaultCustomizer;
|
||||||
|
|
||||||
private class TestHandler extends AbstractHandler
|
private class TestHandler extends AbstractHandler
|
||||||
{
|
{
|
||||||
|
@ -103,23 +107,23 @@ public class WebSocketProxyTest
|
||||||
testHandler = new TestHandler();
|
testHandler = new TestHandler();
|
||||||
handlers.addHandler(testHandler);
|
handlers.addHandler(testHandler);
|
||||||
|
|
||||||
FrameHandler.ConfigurationCustomizer customizer = new FrameHandler.ConfigurationCustomizer();
|
defaultCustomizer = new FrameHandler.ConfigurationCustomizer();
|
||||||
customizer.setIdleTimeout(Duration.ofSeconds(3));
|
defaultCustomizer.setIdleTimeout(Duration.ofSeconds(3));
|
||||||
|
|
||||||
ContextHandler serverContext = new ContextHandler("/server");
|
ContextHandler serverContext = new ContextHandler("/server");
|
||||||
serverFrameHandler = new BasicFrameHandler.ServerEchoHandler("SERVER");
|
serverFrameHandler = new EchoFrameHandler("SERVER");
|
||||||
WebSocketNegotiator negotiator = WebSocketNegotiator.from((negotiation) -> serverFrameHandler, customizer);
|
WebSocketNegotiator negotiator = WebSocketNegotiator.from((negotiation) -> serverFrameHandler, defaultCustomizer);
|
||||||
WebSocketUpgradeHandler upgradeHandler = new WebSocketUpgradeHandler(negotiator);
|
WebSocketUpgradeHandler upgradeHandler = new WebSocketUpgradeHandler(negotiator);
|
||||||
serverContext.setHandler(upgradeHandler);
|
serverContext.setHandler(upgradeHandler);
|
||||||
handlers.addHandler(serverContext);
|
handlers.addHandler(serverContext);
|
||||||
|
|
||||||
_client = new WebSocketCoreClient(null, customizer);
|
_client = new WebSocketCoreClient();
|
||||||
_client.start();
|
_client.start();
|
||||||
URI uri = new URI("ws://localhost:8080/server/");
|
URI uri = new URI("ws://localhost:8080/server/");
|
||||||
|
|
||||||
ContextHandler proxyContext = new ContextHandler("/proxy");
|
ContextHandler proxyContext = new ContextHandler("/proxy");
|
||||||
proxy = new WebSocketProxy(_client, uri);
|
proxy = new WebSocketProxy(_client, uri);
|
||||||
negotiator = WebSocketNegotiator.from((negotiation) -> proxy.client2Proxy, customizer);
|
negotiator = WebSocketNegotiator.from((negotiation) -> proxy.client2Proxy, defaultCustomizer);
|
||||||
upgradeHandler = new WebSocketUpgradeHandler(negotiator);
|
upgradeHandler = new WebSocketUpgradeHandler(negotiator);
|
||||||
proxyContext.setHandler(upgradeHandler);
|
proxyContext.setHandler(upgradeHandler);
|
||||||
handlers.addHandler(proxyContext);
|
handlers.addHandler(proxyContext);
|
||||||
|
@ -151,17 +155,19 @@ public class WebSocketProxyTest
|
||||||
@Test
|
@Test
|
||||||
public void testEcho() throws Exception
|
public void testEcho() throws Exception
|
||||||
{
|
{
|
||||||
BasicFrameHandler clientHandler = new BasicFrameHandler("CLIENT");
|
TestAsyncFrameHandler clientFrameHandler = new TestAsyncFrameHandler("CLIENT");
|
||||||
WebSocketProxy.Client2Proxy proxyClientSide = proxy.client2Proxy;
|
WebSocketProxy.Client2Proxy proxyClientSide = proxy.client2Proxy;
|
||||||
WebSocketProxy.Server2Proxy proxyServerSide = proxy.server2Proxy;
|
WebSocketProxy.Server2Proxy proxyServerSide = proxy.server2Proxy;
|
||||||
|
|
||||||
ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(_client, new URI("ws://localhost:8080/proxy/"), clientHandler);
|
ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(_client, new URI("ws://localhost:8080/proxy/"), clientFrameHandler);
|
||||||
|
upgradeRequest.setConfiguration(defaultCustomizer);
|
||||||
CompletableFuture<CoreSession> response = _client.connect(upgradeRequest);
|
CompletableFuture<CoreSession> response = _client.connect(upgradeRequest);
|
||||||
|
|
||||||
response.get(5, TimeUnit.SECONDS);
|
response.get(5, TimeUnit.SECONDS);
|
||||||
clientHandler.sendText("hello world");
|
clientFrameHandler.sendText("hello world");
|
||||||
clientHandler.close("standard close");
|
clientFrameHandler.close(CloseStatus.NORMAL, "standard close");
|
||||||
clientHandler.awaitClose();
|
assertTrue(clientFrameHandler.closeLatch.await(5, TimeUnit.SECONDS));
|
||||||
serverFrameHandler.awaitClose();
|
assertTrue(serverFrameHandler.closeLatch.await(5, TimeUnit.SECONDS));
|
||||||
awaitProxyClose(proxyClientSide, proxyServerSide);
|
awaitProxyClose(proxyClientSide, proxyServerSide);
|
||||||
|
|
||||||
assertThat(proxyClientSide.getState(), is(WebSocketProxy.State.CLOSED));
|
assertThat(proxyClientSide.getState(), is(WebSocketProxy.State.CLOSED));
|
||||||
|
@ -170,17 +176,17 @@ public class WebSocketProxyTest
|
||||||
assertThat(proxyClientSide.receivedFrames.poll().getPayloadAsUTF8(), is("hello world"));
|
assertThat(proxyClientSide.receivedFrames.poll().getPayloadAsUTF8(), is("hello world"));
|
||||||
assertThat(serverFrameHandler.receivedFrames.poll().getPayloadAsUTF8(), is("hello world"));
|
assertThat(serverFrameHandler.receivedFrames.poll().getPayloadAsUTF8(), is("hello world"));
|
||||||
assertThat(proxyServerSide.receivedFrames.poll().getPayloadAsUTF8(), is("hello world"));
|
assertThat(proxyServerSide.receivedFrames.poll().getPayloadAsUTF8(), is("hello world"));
|
||||||
assertThat(clientHandler.receivedFrames.poll().getPayloadAsUTF8(), is("hello world"));
|
assertThat(clientFrameHandler.receivedFrames.poll().getPayloadAsUTF8(), is("hello world"));
|
||||||
|
|
||||||
assertThat(CloseStatus.getCloseStatus(proxyClientSide.receivedFrames.poll()).getReason(), is("standard close"));
|
assertThat(CloseStatus.getCloseStatus(proxyClientSide.receivedFrames.poll()).getReason(), is("standard close"));
|
||||||
assertThat(CloseStatus.getCloseStatus(serverFrameHandler.receivedFrames.poll()).getReason(), is("standard close"));
|
assertThat(CloseStatus.getCloseStatus(serverFrameHandler.receivedFrames.poll()).getReason(), is("standard close"));
|
||||||
assertThat(CloseStatus.getCloseStatus(proxyServerSide.receivedFrames.poll()).getReason(), is("standard close"));
|
assertThat(CloseStatus.getCloseStatus(proxyServerSide.receivedFrames.poll()).getReason(), is("standard close"));
|
||||||
assertThat(CloseStatus.getCloseStatus(clientHandler.receivedFrames.poll()).getReason(), is("standard close"));
|
assertThat(CloseStatus.getCloseStatus(clientFrameHandler.receivedFrames.poll()).getReason(), is("standard close"));
|
||||||
|
|
||||||
assertNull(proxyClientSide.receivedFrames.poll());
|
assertNull(proxyClientSide.receivedFrames.poll());
|
||||||
assertNull(serverFrameHandler.receivedFrames.poll());
|
assertNull(serverFrameHandler.receivedFrames.poll());
|
||||||
assertNull(proxyServerSide.receivedFrames.poll());
|
assertNull(proxyServerSide.receivedFrames.poll());
|
||||||
assertNull(clientHandler.receivedFrames.poll());
|
assertNull(clientFrameHandler.receivedFrames.poll());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -190,14 +196,16 @@ public class WebSocketProxyTest
|
||||||
WebSocketProxy.Client2Proxy proxyClientSide = proxy.client2Proxy;
|
WebSocketProxy.Client2Proxy proxyClientSide = proxy.client2Proxy;
|
||||||
WebSocketProxy.Server2Proxy proxyServerSide = proxy.server2Proxy;
|
WebSocketProxy.Server2Proxy proxyServerSide = proxy.server2Proxy;
|
||||||
|
|
||||||
BasicFrameHandler clientHandler = new BasicFrameHandler("CLIENT");
|
TestAsyncFrameHandler clientFrameHandler = new TestAsyncFrameHandler("CLIENT");
|
||||||
try (StacklessLogging stacklessLogging = new StacklessLogging(WebSocketChannel.class))
|
try (StacklessLogging stacklessLogging = new StacklessLogging(WebSocketCoreSession.class))
|
||||||
{
|
{
|
||||||
CompletableFuture<CoreSession> response = _client.connect(clientHandler, new URI("ws://localhost:8080/proxy/"));
|
ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(_client, new URI("ws://localhost:8080/proxy/"), clientFrameHandler);
|
||||||
|
upgradeRequest.setConfiguration(defaultCustomizer);
|
||||||
|
CompletableFuture<CoreSession> response = _client.connect(upgradeRequest);
|
||||||
response.get(5, TimeUnit.SECONDS);
|
response.get(5, TimeUnit.SECONDS);
|
||||||
clientHandler.sendText("hello world");
|
clientFrameHandler.sendText("hello world");
|
||||||
clientHandler.close("standard close");
|
clientFrameHandler.close();
|
||||||
clientHandler.awaitClose();
|
assertTrue(clientFrameHandler.closeLatch.await(5, TimeUnit.SECONDS));
|
||||||
awaitProxyClose(proxyClientSide, null);
|
awaitProxyClose(proxyClientSide, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -207,9 +215,9 @@ public class WebSocketProxyTest
|
||||||
assertNull(proxyServerSide.receivedFrames.poll());
|
assertNull(proxyServerSide.receivedFrames.poll());
|
||||||
assertThat(proxyServerSide.getState(), is(WebSocketProxy.State.FAILED));
|
assertThat(proxyServerSide.getState(), is(WebSocketProxy.State.FAILED));
|
||||||
|
|
||||||
assertFalse(serverFrameHandler.opened.await(250, TimeUnit.MILLISECONDS));
|
assertFalse(serverFrameHandler.openLatch.await(250, TimeUnit.MILLISECONDS));
|
||||||
|
|
||||||
CloseStatus closeStatus = CloseStatus.getCloseStatus(clientHandler.receivedFrames.poll());
|
CloseStatus closeStatus = CloseStatus.getCloseStatus(clientFrameHandler.receivedFrames.poll());
|
||||||
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
|
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
|
||||||
assertThat(closeStatus.getReason(), containsString("Failed to upgrade to websocket: Unexpected HTTP Response Status Code:"));
|
assertThat(closeStatus.getReason(), containsString("Failed to upgrade to websocket: Unexpected HTTP Response Status Code:"));
|
||||||
}
|
}
|
||||||
|
@ -218,25 +226,26 @@ public class WebSocketProxyTest
|
||||||
@Test
|
@Test
|
||||||
public void testClientError() throws Exception
|
public void testClientError() throws Exception
|
||||||
{
|
{
|
||||||
BasicFrameHandler clientHandler = new BasicFrameHandler("CLIENT")
|
TestAsyncFrameHandler clientFrameHandler = new TestAsyncFrameHandler("CLIENT")
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public void onOpen(CoreSession coreSession, Callback callback)
|
public void onOpen(CoreSession coreSession, Callback callback)
|
||||||
{
|
{
|
||||||
System.err.println(name + " onOpen(): " + coreSession);
|
|
||||||
throw new IllegalStateException("simulated client onOpen error");
|
throw new IllegalStateException("simulated client onOpen error");
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
WebSocketProxy.Client2Proxy proxyClientSide = proxy.client2Proxy;
|
WebSocketProxy.Client2Proxy proxyClientSide = proxy.client2Proxy;
|
||||||
WebSocketProxy.Server2Proxy proxyServerSide = proxy.server2Proxy;
|
WebSocketProxy.Server2Proxy proxyServerSide = proxy.server2Proxy;
|
||||||
|
|
||||||
try (StacklessLogging stacklessLogging = new StacklessLogging(WebSocketChannel.class))
|
try (StacklessLogging stacklessLogging = new StacklessLogging(WebSocketCoreSession.class))
|
||||||
{
|
{
|
||||||
CompletableFuture<CoreSession> response = _client.connect(clientHandler, new URI("ws://localhost:8080/proxy/"));
|
ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(_client, new URI("ws://localhost:8080/proxy/"), clientFrameHandler);
|
||||||
|
upgradeRequest.setConfiguration(defaultCustomizer);
|
||||||
|
CompletableFuture<CoreSession> response = _client.connect(upgradeRequest);
|
||||||
Exception e = assertThrows(ExecutionException.class, ()->response.get(5, TimeUnit.SECONDS));
|
Exception e = assertThrows(ExecutionException.class, ()->response.get(5, TimeUnit.SECONDS));
|
||||||
assertThat(e.getMessage(), containsString("simulated client onOpen error"));
|
assertThat(e.getMessage(), containsString("simulated client onOpen error"));
|
||||||
clientHandler.awaitClose();
|
assertTrue(clientFrameHandler.closeLatch.await(5, TimeUnit.SECONDS));
|
||||||
serverFrameHandler.awaitClose();
|
assertTrue(serverFrameHandler.closeLatch.await(5, TimeUnit.SECONDS));
|
||||||
awaitProxyClose(proxyClientSide, proxyServerSide);
|
awaitProxyClose(proxyClientSide, proxyServerSide);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -254,11 +263,9 @@ public class WebSocketProxyTest
|
||||||
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
|
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
|
||||||
assertThat(closeStatus.getReason(), containsString("simulated client onOpen error"));
|
assertThat(closeStatus.getReason(), containsString("simulated client onOpen error"));
|
||||||
|
|
||||||
assertNull(clientHandler.receivedFrames.poll());
|
assertNull(clientFrameHandler.receivedFrames.poll());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testServerError() throws Exception
|
public void testServerError() throws Exception
|
||||||
{
|
{
|
||||||
|
@ -266,14 +273,15 @@ public class WebSocketProxyTest
|
||||||
WebSocketProxy.Client2Proxy proxyClientSide = proxy.client2Proxy;
|
WebSocketProxy.Client2Proxy proxyClientSide = proxy.client2Proxy;
|
||||||
WebSocketProxy.Server2Proxy proxyServerSide = proxy.server2Proxy;
|
WebSocketProxy.Server2Proxy proxyServerSide = proxy.server2Proxy;
|
||||||
|
|
||||||
BasicFrameHandler clientHandler = new BasicFrameHandler("CLIENT");
|
TestAsyncFrameHandler clientFrameHandler = new TestAsyncFrameHandler("CLIENT");
|
||||||
ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(_client, new URI("ws://localhost:8080/proxy/"), clientHandler);
|
ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(_client, new URI("ws://localhost:8080/proxy/"), clientFrameHandler);
|
||||||
|
upgradeRequest.setConfiguration(defaultCustomizer);
|
||||||
CompletableFuture<CoreSession> response = _client.connect(upgradeRequest);
|
CompletableFuture<CoreSession> response = _client.connect(upgradeRequest);
|
||||||
|
|
||||||
response.get(5, TimeUnit.SECONDS);
|
response.get(5, TimeUnit.SECONDS);
|
||||||
clientHandler.sendText("hello world");
|
clientFrameHandler.sendText("hello world");
|
||||||
clientHandler.awaitClose();
|
assertTrue(clientFrameHandler.closeLatch.await(5, TimeUnit.SECONDS));
|
||||||
serverFrameHandler.awaitClose();
|
assertTrue(serverFrameHandler.closeLatch.await(5, TimeUnit.SECONDS));
|
||||||
awaitProxyClose(proxyClientSide, proxyServerSide);
|
awaitProxyClose(proxyClientSide, proxyServerSide);
|
||||||
|
|
||||||
CloseStatus closeStatus;
|
CloseStatus closeStatus;
|
||||||
|
@ -298,7 +306,7 @@ public class WebSocketProxyTest
|
||||||
assertThat(closeStatus.getReason(), is("intentionally throwing in server onFrame()"));
|
assertThat(closeStatus.getReason(), is("intentionally throwing in server onFrame()"));
|
||||||
|
|
||||||
// Client
|
// Client
|
||||||
frame = clientHandler.receivedFrames.poll();
|
frame = clientFrameHandler.receivedFrames.poll();
|
||||||
closeStatus = CloseStatus.getCloseStatus(frame);
|
closeStatus = CloseStatus.getCloseStatus(frame);
|
||||||
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
|
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
|
||||||
assertThat(closeStatus.getReason(), is("intentionally throwing in server onFrame()"));
|
assertThat(closeStatus.getReason(), is("intentionally throwing in server onFrame()"));
|
||||||
|
@ -323,21 +331,23 @@ public class WebSocketProxyTest
|
||||||
WebSocketProxy.Client2Proxy proxyClientSide = proxy.client2Proxy;
|
WebSocketProxy.Client2Proxy proxyClientSide = proxy.client2Proxy;
|
||||||
WebSocketProxy.Server2Proxy proxyServerSide = proxy.server2Proxy;
|
WebSocketProxy.Server2Proxy proxyServerSide = proxy.server2Proxy;
|
||||||
|
|
||||||
BasicFrameHandler clientHandler = new BasicFrameHandler("CLIENT")
|
TestAsyncFrameHandler clientFrameHandler = new TestAsyncFrameHandler("CLIENT")
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public void onFrame(Frame frame, Callback callback)
|
public void onFrame(Frame frame, Callback callback)
|
||||||
{
|
{
|
||||||
System.err.println(name + " onFrame(): " + frame);
|
LOG.info("[{}] onFrame {}", name, frame);
|
||||||
receivedFrames.offer(Frame.copy(frame));
|
receivedFrames.offer(Frame.copy(frame));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
CompletableFuture<CoreSession> response = _client.connect(clientHandler, new URI("ws://localhost:8080/proxy/"));
|
ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(_client, new URI("ws://localhost:8080/proxy/"), clientFrameHandler);
|
||||||
|
upgradeRequest.setConfiguration(defaultCustomizer);
|
||||||
|
CompletableFuture<CoreSession> response = _client.connect(upgradeRequest);
|
||||||
response.get(5, TimeUnit.SECONDS);
|
response.get(5, TimeUnit.SECONDS);
|
||||||
clientHandler.sendText("hello world");
|
clientFrameHandler.sendText("hello world");
|
||||||
clientHandler.awaitClose();
|
assertTrue(clientFrameHandler.closeLatch.await(5, TimeUnit.SECONDS));
|
||||||
serverFrameHandler.awaitClose();
|
assertTrue(serverFrameHandler.closeLatch.await(5, TimeUnit.SECONDS));
|
||||||
awaitProxyClose(proxyClientSide, proxyServerSide);
|
awaitProxyClose(proxyClientSide, proxyServerSide);
|
||||||
|
|
||||||
CloseStatus closeStatus;
|
CloseStatus closeStatus;
|
||||||
|
@ -361,11 +371,11 @@ public class WebSocketProxyTest
|
||||||
assertThat(closeStatus.getReason(), is("intentionally throwing in server onFrame()"));
|
assertThat(closeStatus.getReason(), is("intentionally throwing in server onFrame()"));
|
||||||
|
|
||||||
// Client
|
// Client
|
||||||
frame = clientHandler.receivedFrames.poll();
|
frame = clientFrameHandler.receivedFrames.poll();
|
||||||
closeStatus = CloseStatus.getCloseStatus(frame);
|
closeStatus = CloseStatus.getCloseStatus(frame);
|
||||||
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
|
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
|
||||||
assertThat(closeStatus.getReason(), is("intentionally throwing in server onFrame()"));
|
assertThat(closeStatus.getReason(), is("intentionally throwing in server onFrame()"));
|
||||||
assertNull(clientHandler.receivedFrames.poll());
|
assertNull(clientFrameHandler.receivedFrames.poll());
|
||||||
|
|
||||||
// Client2Proxy does NOT receive close response from the client and fails
|
// Client2Proxy does NOT receive close response from the client and fails
|
||||||
assertNull(proxyClientSide.receivedFrames.poll());
|
assertNull(proxyClientSide.receivedFrames.poll());
|
||||||
|
|
Loading…
Reference in New Issue