Merge pull request #3365 from lachlan-roberts/jetty-10.0.x-3170-websocket-proxy
Issue #3170 - WebSocket Proxy
This commit is contained in:
commit
fa4abfa6bb
|
@ -243,6 +243,33 @@ public interface Callback extends Invocable
|
|||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a callback which combines two other callbacks and will succeed or fail them both.
|
||||
* @param callback1 The first callback
|
||||
* @param callback2 The second callback
|
||||
* @return a new callback.
|
||||
*/
|
||||
static Callback from(Callback callback1, Callback callback2)
|
||||
{
|
||||
return new Callback()
|
||||
{
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
callback1.succeeded();
|
||||
callback2.succeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
callback1.failed(x);
|
||||
callback2.failed(x);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
class Completing implements Callback
|
||||
{
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.core;
|
||||
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
|
||||
public class EchoFrameHandler extends TestAsyncFrameHandler
|
||||
{
|
||||
private boolean throwOnFrame;
|
||||
|
||||
public void throwOnFrame()
|
||||
{
|
||||
throwOnFrame = true;
|
||||
}
|
||||
|
||||
public EchoFrameHandler(String name)
|
||||
{
|
||||
super(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFrame(Frame frame, Callback callback)
|
||||
{
|
||||
LOG.info("[{}] onFrame {}", name, frame);
|
||||
receivedFrames.offer(Frame.copy(frame));
|
||||
|
||||
if (throwOnFrame)
|
||||
throw new RuntimeException("intentionally throwing in server onFrame()");
|
||||
|
||||
if (frame.isDataFrame())
|
||||
{
|
||||
LOG.info("[{}] echoDataFrame {}", name, frame);
|
||||
coreSession.sendFrame(new Frame(frame.getOpCode(), frame.getPayload()), callback, false);
|
||||
}
|
||||
else
|
||||
{
|
||||
callback.succeeded();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,107 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.core;
|
||||
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.eclipse.jetty.util.BlockingArrayQueue;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
||||
public class TestAsyncFrameHandler implements FrameHandler
|
||||
{
|
||||
protected static final Logger LOG = Log.getLogger(TestAsyncFrameHandler.class);
|
||||
protected final String name;
|
||||
|
||||
public CoreSession coreSession;
|
||||
public BlockingQueue<Frame> receivedFrames = new BlockingArrayQueue<>();
|
||||
public volatile Throwable error;
|
||||
public CountDownLatch openLatch = new CountDownLatch(1);
|
||||
public CountDownLatch errorLatch = new CountDownLatch(1);
|
||||
public CountDownLatch closeLatch = new CountDownLatch(1);
|
||||
|
||||
public TestAsyncFrameHandler()
|
||||
{
|
||||
name = TestAsyncFrameHandler.class.getSimpleName();
|
||||
}
|
||||
|
||||
public TestAsyncFrameHandler(String name)
|
||||
{
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onOpen(CoreSession coreSession, Callback callback)
|
||||
{
|
||||
LOG.info("[{}] onOpen {}", name, coreSession);
|
||||
this.coreSession = coreSession;
|
||||
callback.succeeded();
|
||||
openLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFrame(Frame frame, Callback callback)
|
||||
{
|
||||
LOG.info("[{}] onFrame {}", name, frame);
|
||||
receivedFrames.offer(Frame.copy(frame));
|
||||
callback.succeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClosed(CloseStatus closeStatus, Callback callback)
|
||||
{
|
||||
LOG.info("[{}] onClosed {}", name, closeStatus);
|
||||
closeLatch.countDown();
|
||||
callback.succeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable cause, Callback callback)
|
||||
{
|
||||
LOG.info("[{}] onError {} ", name, cause == null?null:cause.toString());
|
||||
error = cause;
|
||||
errorLatch.countDown();
|
||||
callback.succeeded();
|
||||
}
|
||||
|
||||
public void sendText(String text)
|
||||
{
|
||||
LOG.info("[{}] sendText {} ", name, text);
|
||||
Frame frame = new Frame(OpCode.TEXT, text);
|
||||
coreSession.sendFrame(frame, Callback.NOOP, false);
|
||||
}
|
||||
|
||||
public void sendFrame(Frame frame)
|
||||
{
|
||||
LOG.info("[{}] sendFrame {} ", name, frame);
|
||||
coreSession.sendFrame(frame, Callback.NOOP, false);
|
||||
}
|
||||
|
||||
public void close()
|
||||
{
|
||||
close(CloseStatus.NORMAL, null);
|
||||
}
|
||||
|
||||
public void close(int closeStatus, String reason)
|
||||
{
|
||||
sendFrame(CloseStatus.toFrame(closeStatus, reason));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,653 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.core.proxy;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.eclipse.jetty.util.BlockingArrayQueue;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.core.CloseStatus;
|
||||
import org.eclipse.jetty.websocket.core.Frame;
|
||||
import org.eclipse.jetty.websocket.core.FrameHandler;
|
||||
import org.eclipse.jetty.websocket.core.OpCode;
|
||||
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
|
||||
|
||||
class WebSocketProxy
|
||||
{
|
||||
protected static final Logger LOG = Log.getLogger(WebSocketProxy.class);
|
||||
|
||||
enum State
|
||||
{
|
||||
NOT_OPEN,
|
||||
CONNECTING,
|
||||
OPEN,
|
||||
ISHUT,
|
||||
OSHUT,
|
||||
CLOSED,
|
||||
FAILED
|
||||
}
|
||||
|
||||
private final Object lock = new Object();
|
||||
private WebSocketCoreClient client;
|
||||
private URI serverUri;
|
||||
|
||||
public Client2Proxy client2Proxy = new Client2Proxy();
|
||||
public Server2Proxy server2Proxy = new Server2Proxy();
|
||||
|
||||
public WebSocketProxy(WebSocketCoreClient client, URI serverUri)
|
||||
{
|
||||
this.client = client;
|
||||
this.serverUri = serverUri;
|
||||
}
|
||||
|
||||
class Client2Proxy implements FrameHandler
|
||||
{
|
||||
private CoreSession client;
|
||||
private State state = State.NOT_OPEN;
|
||||
|
||||
private Callback closeCallback;
|
||||
private Throwable error;
|
||||
|
||||
public BlockingQueue<Frame> receivedFrames = new BlockingArrayQueue<>();
|
||||
protected CountDownLatch closed = new CountDownLatch(1);
|
||||
|
||||
public State getState()
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
return state;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onOpen(CoreSession coreSession, Callback callback)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("[{}] onOpen {}", toString(), coreSession);
|
||||
|
||||
Throwable failure = null;
|
||||
synchronized (lock)
|
||||
{
|
||||
switch (state)
|
||||
{
|
||||
case NOT_OPEN:
|
||||
state = State.CONNECTING;
|
||||
client = coreSession;
|
||||
break;
|
||||
|
||||
default:
|
||||
failure = new IllegalStateException();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (failure != null)
|
||||
callback.failed(failure);
|
||||
else
|
||||
server2Proxy.connect(Callback.from(() -> onOpenSuccess(callback), (t) -> onOpenFail(callback, t)));
|
||||
}
|
||||
|
||||
private void onOpenSuccess(Callback callback)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("[{}] onOpenSuccess", toString());
|
||||
|
||||
Throwable failure = null;
|
||||
synchronized (lock)
|
||||
{
|
||||
switch (state)
|
||||
{
|
||||
case CONNECTING:
|
||||
state = State.OPEN;
|
||||
break;
|
||||
|
||||
case FAILED:
|
||||
failure = error;
|
||||
break;
|
||||
|
||||
default:
|
||||
failure = new IllegalStateException();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (failure != null)
|
||||
server2Proxy.fail(failure, callback);
|
||||
else
|
||||
callback.succeeded();
|
||||
}
|
||||
|
||||
private void onOpenFail(Callback callback, Throwable t)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("[{}] onOpenFail {}", toString(), t);
|
||||
|
||||
Throwable failure = t;
|
||||
synchronized (lock)
|
||||
{
|
||||
switch (state)
|
||||
{
|
||||
case CONNECTING:
|
||||
state = State.FAILED;
|
||||
error = t;
|
||||
break;
|
||||
|
||||
case FAILED:
|
||||
failure = error;
|
||||
failure.addSuppressed(t);
|
||||
break;
|
||||
|
||||
default:
|
||||
failure = new IllegalStateException();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
callback.failed(failure);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFrame(Frame frame, Callback callback)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("[{}] onFrame {}", toString(), frame);
|
||||
receivedFrames.offer(Frame.copy(frame));
|
||||
|
||||
Callback sendCallback = callback;
|
||||
Throwable failure = null;
|
||||
synchronized (lock)
|
||||
{
|
||||
switch (state)
|
||||
{
|
||||
case OPEN:
|
||||
if (frame.getOpCode() == OpCode.CLOSE)
|
||||
{
|
||||
state = State.ISHUT;
|
||||
// the callback is saved until a close response comes in sendFrame from Server2Proxy
|
||||
// if the callback was completed here then core would send its own close response
|
||||
closeCallback = callback;
|
||||
sendCallback = Callback.from(()->{}, callback::failed);
|
||||
}
|
||||
break;
|
||||
|
||||
case OSHUT:
|
||||
if (frame.getOpCode() == OpCode.CLOSE)
|
||||
state = State.CLOSED;
|
||||
break;
|
||||
|
||||
case FAILED:
|
||||
failure = error;
|
||||
break;
|
||||
|
||||
default:
|
||||
failure = new IllegalStateException();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (failure != null)
|
||||
callback.failed(failure);
|
||||
else
|
||||
server2Proxy.send(frame, sendCallback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable failure, Callback callback)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("[{}] onError {}", toString(), failure);
|
||||
|
||||
boolean failServer2Proxy;
|
||||
synchronized (lock)
|
||||
{
|
||||
switch (state)
|
||||
{
|
||||
case FAILED:
|
||||
case CLOSED:
|
||||
failServer2Proxy = false;
|
||||
break;
|
||||
|
||||
default:
|
||||
state = State.FAILED;
|
||||
error = failure;
|
||||
failServer2Proxy = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (failServer2Proxy)
|
||||
server2Proxy.fail(failure,callback);
|
||||
else
|
||||
callback.failed(failure);
|
||||
}
|
||||
|
||||
public void fail(Throwable failure, Callback callback)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("[{}] fail {}", toString(), failure);
|
||||
|
||||
Callback sendCallback = null;
|
||||
synchronized (lock)
|
||||
{
|
||||
switch (state)
|
||||
{
|
||||
case OPEN:
|
||||
state = State.FAILED;
|
||||
sendCallback = Callback.from(callback, failure);
|
||||
break;
|
||||
|
||||
case ISHUT:
|
||||
state = State.FAILED;
|
||||
Callback doubleCallback = Callback.from(callback, closeCallback);
|
||||
sendCallback = Callback.from(doubleCallback, failure);
|
||||
break;
|
||||
|
||||
default:
|
||||
state = State.FAILED;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (sendCallback != null)
|
||||
client.close(CloseStatus.SHUTDOWN, failure.getMessage(), sendCallback);
|
||||
else
|
||||
callback.failed(failure);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClosed(CloseStatus closeStatus, Callback callback)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("[{}] onClosed {}", toString(), closeStatus);
|
||||
|
||||
closed.countDown();
|
||||
callback.succeeded();
|
||||
}
|
||||
|
||||
public void send(Frame frame, Callback callback)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("[{}] send {}", toString(), frame);
|
||||
|
||||
Callback sendCallback = callback;
|
||||
Throwable failure = null;
|
||||
synchronized (lock)
|
||||
{
|
||||
switch (state)
|
||||
{
|
||||
case OPEN:
|
||||
if (frame.getOpCode() == OpCode.CLOSE)
|
||||
state = State.OSHUT;
|
||||
break;
|
||||
|
||||
case ISHUT:
|
||||
if (frame.getOpCode() == OpCode.CLOSE)
|
||||
{
|
||||
state = State.CLOSED;
|
||||
sendCallback = Callback.from(callback, closeCallback);
|
||||
}
|
||||
break;
|
||||
|
||||
case FAILED:
|
||||
failure = error;
|
||||
break;
|
||||
|
||||
default:
|
||||
failure = new IllegalStateException();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (failure != null)
|
||||
callback.failed(failure);
|
||||
else
|
||||
client.sendFrame(frame, sendCallback, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "Client2Proxy:" + getState();
|
||||
}
|
||||
}
|
||||
|
||||
class Server2Proxy implements FrameHandler
|
||||
{
|
||||
private CoreSession server;
|
||||
private State state = State.NOT_OPEN;
|
||||
|
||||
private Callback closeCallback;
|
||||
private Throwable error;
|
||||
|
||||
public BlockingQueue<Frame> receivedFrames = new BlockingArrayQueue<>();
|
||||
protected CountDownLatch closed = new CountDownLatch(1);
|
||||
|
||||
public State getState()
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
return state;
|
||||
}
|
||||
}
|
||||
|
||||
public void connect(Callback callback)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("[{}] connect", toString());
|
||||
|
||||
Throwable failure = null;
|
||||
synchronized (lock)
|
||||
{
|
||||
switch (state)
|
||||
{
|
||||
case NOT_OPEN:
|
||||
try
|
||||
{
|
||||
state = State.CONNECTING;
|
||||
client.connect(this, serverUri).whenComplete((s,t)->
|
||||
{
|
||||
if (t != null)
|
||||
onConnectFailure(t, callback);
|
||||
else
|
||||
onConnectSuccess(s, callback);
|
||||
});
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
state = State.FAILED;
|
||||
error = e;
|
||||
failure = e;
|
||||
}
|
||||
break;
|
||||
|
||||
case FAILED:
|
||||
failure = error;
|
||||
break;
|
||||
|
||||
default:
|
||||
state = State.FAILED;
|
||||
error = new IllegalStateException();
|
||||
failure = error;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (failure != null)
|
||||
callback.failed(failure);
|
||||
}
|
||||
|
||||
private void onConnectSuccess(CoreSession coreSession, Callback callback)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("[{}] onConnectSuccess {}", toString(), coreSession);
|
||||
|
||||
Throwable failure = null;
|
||||
synchronized (lock)
|
||||
{
|
||||
switch (state)
|
||||
{
|
||||
case OPEN:
|
||||
break;
|
||||
|
||||
case FAILED:
|
||||
failure = error;
|
||||
break;
|
||||
|
||||
default:
|
||||
state = State.FAILED;
|
||||
error = new IllegalStateException();
|
||||
failure = error;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (failure != null)
|
||||
coreSession.close(CloseStatus.SHUTDOWN, failure.getMessage(), Callback.from(callback, failure));
|
||||
else
|
||||
callback.succeeded();
|
||||
}
|
||||
|
||||
private void onConnectFailure(Throwable t, Callback callback)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("[{}] onConnectFailure {}", toString(), t);
|
||||
|
||||
Throwable failure = t;
|
||||
synchronized (lock)
|
||||
{
|
||||
switch (state)
|
||||
{
|
||||
case CONNECTING:
|
||||
state = State.FAILED;
|
||||
error = t;
|
||||
break;
|
||||
|
||||
case FAILED:
|
||||
failure = error;
|
||||
break;
|
||||
|
||||
default:
|
||||
state = State.FAILED;
|
||||
error = new IllegalStateException();
|
||||
failure = error;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
callback.failed(failure);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onOpen(CoreSession coreSession, Callback callback)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("[{}] onOpen {}", toString(), coreSession);
|
||||
|
||||
Throwable failure = null;
|
||||
synchronized (lock)
|
||||
{
|
||||
switch (state)
|
||||
{
|
||||
case CONNECTING:
|
||||
state = State.OPEN;
|
||||
server = coreSession;
|
||||
break;
|
||||
|
||||
case FAILED:
|
||||
failure = error;
|
||||
break;
|
||||
|
||||
default:
|
||||
failure = new IllegalStateException();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (failure != null)
|
||||
callback.failed(failure);
|
||||
else
|
||||
callback.succeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFrame(Frame frame, Callback callback)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("[{}] onFrame {}", toString(), frame);
|
||||
receivedFrames.offer(Frame.copy(frame));
|
||||
|
||||
Callback sendCallback = callback;
|
||||
Throwable failure = null;
|
||||
synchronized (lock)
|
||||
{
|
||||
switch (state)
|
||||
{
|
||||
case OPEN:
|
||||
if (frame.getOpCode() == OpCode.CLOSE)
|
||||
{
|
||||
state = State.ISHUT;
|
||||
closeCallback = callback;
|
||||
sendCallback = Callback.from(()->{}, callback::failed);
|
||||
}
|
||||
break;
|
||||
|
||||
case OSHUT:
|
||||
if (frame.getOpCode() == OpCode.CLOSE)
|
||||
state = State.CLOSED;
|
||||
break;
|
||||
|
||||
case FAILED:
|
||||
failure = error;
|
||||
break;
|
||||
|
||||
default:
|
||||
failure = new IllegalStateException();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (failure != null)
|
||||
callback.failed(failure);
|
||||
else
|
||||
client2Proxy.send(frame, sendCallback);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable failure, Callback callback)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("[{}] onError {}", toString(), failure);
|
||||
|
||||
boolean failClient2Proxy = false;
|
||||
synchronized (lock)
|
||||
{
|
||||
switch (state)
|
||||
{
|
||||
case FAILED:
|
||||
case CLOSED:
|
||||
break;
|
||||
|
||||
default:
|
||||
state = State.FAILED;
|
||||
error = failure;
|
||||
failClient2Proxy = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (failClient2Proxy)
|
||||
client2Proxy.fail(failure,callback);
|
||||
else
|
||||
callback.failed(failure);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClosed(CloseStatus closeStatus, Callback callback)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("[{}] onClosed {}", toString(), closeStatus);
|
||||
closed.countDown();
|
||||
callback.succeeded();
|
||||
}
|
||||
|
||||
public void fail(Throwable failure, Callback callback)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("[{}] fail {}", toString(), failure);
|
||||
|
||||
Callback sendCallback = null;
|
||||
synchronized (lock)
|
||||
{
|
||||
switch (state)
|
||||
{
|
||||
case OPEN:
|
||||
state = State.FAILED;
|
||||
sendCallback = Callback.from(callback, failure);
|
||||
break;
|
||||
|
||||
case ISHUT:
|
||||
state = State.FAILED;
|
||||
Callback doubleCallback = Callback.from(callback, closeCallback);
|
||||
sendCallback = Callback.from(doubleCallback, failure);
|
||||
|
||||
default:
|
||||
state = State.FAILED;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (sendCallback != null)
|
||||
server.close(CloseStatus.SHUTDOWN, failure.getMessage(), sendCallback);
|
||||
else
|
||||
callback.failed(failure);
|
||||
}
|
||||
|
||||
public void send(Frame frame, Callback callback)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("[{}] send {}", toString(), frame);
|
||||
|
||||
Callback sendCallback = callback;
|
||||
Throwable failure = null;
|
||||
synchronized (lock)
|
||||
{
|
||||
switch (state)
|
||||
{
|
||||
case OPEN:
|
||||
if (frame.getOpCode() == OpCode.CLOSE)
|
||||
state = State.OSHUT;
|
||||
break;
|
||||
|
||||
case ISHUT:
|
||||
if (frame.getOpCode() == OpCode.CLOSE)
|
||||
{
|
||||
state = State.CLOSED;
|
||||
sendCallback = Callback.from(callback, closeCallback);
|
||||
}
|
||||
break;
|
||||
|
||||
case FAILED:
|
||||
failure = error;
|
||||
break;
|
||||
|
||||
default:
|
||||
failure = new IllegalStateException();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (failure != null)
|
||||
callback.failed(failure);
|
||||
else
|
||||
server.sendFrame(frame, sendCallback, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "Server2Proxy:" + getState();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,388 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.core.proxy;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.server.Request;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.ServerConnector;
|
||||
import org.eclipse.jetty.server.handler.AbstractHandler;
|
||||
import org.eclipse.jetty.server.handler.ContextHandler;
|
||||
import org.eclipse.jetty.server.handler.HandlerList;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.log.StacklessLogging;
|
||||
import org.eclipse.jetty.websocket.core.CloseStatus;
|
||||
import org.eclipse.jetty.websocket.core.EchoFrameHandler;
|
||||
import org.eclipse.jetty.websocket.core.Frame;
|
||||
import org.eclipse.jetty.websocket.core.FrameHandler;
|
||||
import org.eclipse.jetty.websocket.core.FrameHandler.CoreSession;
|
||||
import org.eclipse.jetty.websocket.core.OpCode;
|
||||
import org.eclipse.jetty.websocket.core.TestAsyncFrameHandler;
|
||||
import org.eclipse.jetty.websocket.core.client.ClientUpgradeRequest;
|
||||
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
|
||||
import org.eclipse.jetty.websocket.core.internal.WebSocketCoreSession;
|
||||
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
|
||||
import org.eclipse.jetty.websocket.core.server.WebSocketUpgradeHandler;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class WebSocketProxyTest
|
||||
{
|
||||
private Server _server;
|
||||
private WebSocketCoreClient _client;
|
||||
private WebSocketProxy proxy;
|
||||
private EchoFrameHandler serverFrameHandler;
|
||||
private TestHandler testHandler;
|
||||
FrameHandler.ConfigurationCustomizer defaultCustomizer;
|
||||
|
||||
private class TestHandler extends AbstractHandler
|
||||
{
|
||||
public void blockServerUpgradeRequests()
|
||||
{
|
||||
blockServerUpgradeRequests = true;
|
||||
}
|
||||
|
||||
public boolean blockServerUpgradeRequests = false;
|
||||
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
if (baseRequest.getHeader("Upgrade") != null)
|
||||
{
|
||||
if (blockServerUpgradeRequests && target.startsWith("/server/"))
|
||||
{
|
||||
response.sendError(HttpStatus.INTERNAL_SERVER_ERROR_500);
|
||||
baseRequest.setHandled(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
public void start() throws Exception
|
||||
{
|
||||
_server = new Server();
|
||||
ServerConnector connector = new ServerConnector(_server);
|
||||
connector.setPort(8080);
|
||||
_server.addConnector(connector);
|
||||
|
||||
HandlerList handlers = new HandlerList();
|
||||
testHandler = new TestHandler();
|
||||
handlers.addHandler(testHandler);
|
||||
|
||||
defaultCustomizer = new FrameHandler.ConfigurationCustomizer();
|
||||
defaultCustomizer.setIdleTimeout(Duration.ofSeconds(3));
|
||||
|
||||
ContextHandler serverContext = new ContextHandler("/server");
|
||||
serverFrameHandler = new EchoFrameHandler("SERVER");
|
||||
WebSocketNegotiator negotiator = WebSocketNegotiator.from((negotiation) -> serverFrameHandler, defaultCustomizer);
|
||||
WebSocketUpgradeHandler upgradeHandler = new WebSocketUpgradeHandler(negotiator);
|
||||
serverContext.setHandler(upgradeHandler);
|
||||
handlers.addHandler(serverContext);
|
||||
|
||||
_client = new WebSocketCoreClient();
|
||||
_client.start();
|
||||
URI uri = new URI("ws://localhost:8080/server/");
|
||||
|
||||
ContextHandler proxyContext = new ContextHandler("/proxy");
|
||||
proxy = new WebSocketProxy(_client, uri);
|
||||
negotiator = WebSocketNegotiator.from((negotiation) -> proxy.client2Proxy, defaultCustomizer);
|
||||
upgradeHandler = new WebSocketUpgradeHandler(negotiator);
|
||||
proxyContext.setHandler(upgradeHandler);
|
||||
handlers.addHandler(proxyContext);
|
||||
|
||||
_server.setHandler(handlers);
|
||||
_server.start();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void stop() throws Exception
|
||||
{
|
||||
_client.stop();
|
||||
_server.stop();
|
||||
}
|
||||
|
||||
public void awaitProxyClose(WebSocketProxy.Client2Proxy client2Proxy, WebSocketProxy.Server2Proxy server2Proxy) throws Exception
|
||||
{
|
||||
if (client2Proxy != null && !client2Proxy.closed.await(5, TimeUnit.SECONDS))
|
||||
{
|
||||
throw new TimeoutException("client2Proxy close timeout");
|
||||
}
|
||||
|
||||
if (server2Proxy != null && !server2Proxy.closed.await(5, TimeUnit.SECONDS))
|
||||
{
|
||||
throw new TimeoutException("server2Proxy close timeout");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEcho() throws Exception
|
||||
{
|
||||
TestAsyncFrameHandler clientFrameHandler = new TestAsyncFrameHandler("CLIENT");
|
||||
WebSocketProxy.Client2Proxy proxyClientSide = proxy.client2Proxy;
|
||||
WebSocketProxy.Server2Proxy proxyServerSide = proxy.server2Proxy;
|
||||
|
||||
ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(_client, new URI("ws://localhost:8080/proxy/"), clientFrameHandler);
|
||||
upgradeRequest.setConfiguration(defaultCustomizer);
|
||||
CompletableFuture<CoreSession> response = _client.connect(upgradeRequest);
|
||||
|
||||
response.get(5, TimeUnit.SECONDS);
|
||||
clientFrameHandler.sendText("hello world");
|
||||
clientFrameHandler.close(CloseStatus.NORMAL, "standard close");
|
||||
assertTrue(clientFrameHandler.closeLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(serverFrameHandler.closeLatch.await(5, TimeUnit.SECONDS));
|
||||
awaitProxyClose(proxyClientSide, proxyServerSide);
|
||||
|
||||
assertThat(proxyClientSide.getState(), is(WebSocketProxy.State.CLOSED));
|
||||
assertThat(proxyServerSide.getState(), is(WebSocketProxy.State.CLOSED));
|
||||
|
||||
assertThat(proxyClientSide.receivedFrames.poll().getPayloadAsUTF8(), is("hello world"));
|
||||
assertThat(serverFrameHandler.receivedFrames.poll().getPayloadAsUTF8(), is("hello world"));
|
||||
assertThat(proxyServerSide.receivedFrames.poll().getPayloadAsUTF8(), is("hello world"));
|
||||
assertThat(clientFrameHandler.receivedFrames.poll().getPayloadAsUTF8(), is("hello world"));
|
||||
|
||||
assertThat(CloseStatus.getCloseStatus(proxyClientSide.receivedFrames.poll()).getReason(), is("standard close"));
|
||||
assertThat(CloseStatus.getCloseStatus(serverFrameHandler.receivedFrames.poll()).getReason(), is("standard close"));
|
||||
assertThat(CloseStatus.getCloseStatus(proxyServerSide.receivedFrames.poll()).getReason(), is("standard close"));
|
||||
assertThat(CloseStatus.getCloseStatus(clientFrameHandler.receivedFrames.poll()).getReason(), is("standard close"));
|
||||
|
||||
assertNull(proxyClientSide.receivedFrames.poll());
|
||||
assertNull(serverFrameHandler.receivedFrames.poll());
|
||||
assertNull(proxyServerSide.receivedFrames.poll());
|
||||
assertNull(clientFrameHandler.receivedFrames.poll());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailServerUpgrade() throws Exception
|
||||
{
|
||||
testHandler.blockServerUpgradeRequests();
|
||||
WebSocketProxy.Client2Proxy proxyClientSide = proxy.client2Proxy;
|
||||
WebSocketProxy.Server2Proxy proxyServerSide = proxy.server2Proxy;
|
||||
|
||||
TestAsyncFrameHandler clientFrameHandler = new TestAsyncFrameHandler("CLIENT");
|
||||
try (StacklessLogging stacklessLogging = new StacklessLogging(WebSocketCoreSession.class))
|
||||
{
|
||||
ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(_client, new URI("ws://localhost:8080/proxy/"), clientFrameHandler);
|
||||
upgradeRequest.setConfiguration(defaultCustomizer);
|
||||
CompletableFuture<CoreSession> response = _client.connect(upgradeRequest);
|
||||
response.get(5, TimeUnit.SECONDS);
|
||||
clientFrameHandler.sendText("hello world");
|
||||
clientFrameHandler.close();
|
||||
assertTrue(clientFrameHandler.closeLatch.await(5, TimeUnit.SECONDS));
|
||||
awaitProxyClose(proxyClientSide, null);
|
||||
}
|
||||
|
||||
assertNull(proxyClientSide.receivedFrames.poll());
|
||||
assertThat(proxyClientSide.getState(), is(WebSocketProxy.State.FAILED));
|
||||
|
||||
assertNull(proxyServerSide.receivedFrames.poll());
|
||||
assertThat(proxyServerSide.getState(), is(WebSocketProxy.State.FAILED));
|
||||
|
||||
assertFalse(serverFrameHandler.openLatch.await(250, TimeUnit.MILLISECONDS));
|
||||
|
||||
CloseStatus closeStatus = CloseStatus.getCloseStatus(clientFrameHandler.receivedFrames.poll());
|
||||
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
|
||||
assertThat(closeStatus.getReason(), containsString("Failed to upgrade to websocket: Unexpected HTTP Response Status Code:"));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testClientError() throws Exception
|
||||
{
|
||||
TestAsyncFrameHandler clientFrameHandler = new TestAsyncFrameHandler("CLIENT")
|
||||
{
|
||||
@Override
|
||||
public void onOpen(CoreSession coreSession, Callback callback)
|
||||
{
|
||||
throw new IllegalStateException("simulated client onOpen error");
|
||||
}
|
||||
};
|
||||
WebSocketProxy.Client2Proxy proxyClientSide = proxy.client2Proxy;
|
||||
WebSocketProxy.Server2Proxy proxyServerSide = proxy.server2Proxy;
|
||||
|
||||
try (StacklessLogging stacklessLogging = new StacklessLogging(WebSocketCoreSession.class))
|
||||
{
|
||||
ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(_client, new URI("ws://localhost:8080/proxy/"), clientFrameHandler);
|
||||
upgradeRequest.setConfiguration(defaultCustomizer);
|
||||
CompletableFuture<CoreSession> response = _client.connect(upgradeRequest);
|
||||
Exception e = assertThrows(ExecutionException.class, ()->response.get(5, TimeUnit.SECONDS));
|
||||
assertThat(e.getMessage(), containsString("simulated client onOpen error"));
|
||||
assertTrue(clientFrameHandler.closeLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(serverFrameHandler.closeLatch.await(5, TimeUnit.SECONDS));
|
||||
awaitProxyClose(proxyClientSide, proxyServerSide);
|
||||
}
|
||||
|
||||
CloseStatus closeStatus = CloseStatus.getCloseStatus(proxyClientSide.receivedFrames.poll());
|
||||
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
|
||||
assertThat(closeStatus.getReason(), containsString("simulated client onOpen error"));
|
||||
assertThat(proxyClientSide.getState(), is(WebSocketProxy.State.CLOSED));
|
||||
|
||||
closeStatus = CloseStatus.getCloseStatus(proxyServerSide.receivedFrames.poll());
|
||||
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
|
||||
assertThat(closeStatus.getReason(), containsString("simulated client onOpen error"));
|
||||
assertThat(proxyServerSide.getState(), is(WebSocketProxy.State.CLOSED));
|
||||
|
||||
closeStatus = CloseStatus.getCloseStatus(serverFrameHandler.receivedFrames.poll());
|
||||
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
|
||||
assertThat(closeStatus.getReason(), containsString("simulated client onOpen error"));
|
||||
|
||||
assertNull(clientFrameHandler.receivedFrames.poll());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testServerError() throws Exception
|
||||
{
|
||||
serverFrameHandler.throwOnFrame();
|
||||
WebSocketProxy.Client2Proxy proxyClientSide = proxy.client2Proxy;
|
||||
WebSocketProxy.Server2Proxy proxyServerSide = proxy.server2Proxy;
|
||||
|
||||
TestAsyncFrameHandler clientFrameHandler = new TestAsyncFrameHandler("CLIENT");
|
||||
ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(_client, new URI("ws://localhost:8080/proxy/"), clientFrameHandler);
|
||||
upgradeRequest.setConfiguration(defaultCustomizer);
|
||||
CompletableFuture<CoreSession> response = _client.connect(upgradeRequest);
|
||||
|
||||
response.get(5, TimeUnit.SECONDS);
|
||||
clientFrameHandler.sendText("hello world");
|
||||
assertTrue(clientFrameHandler.closeLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(serverFrameHandler.closeLatch.await(5, TimeUnit.SECONDS));
|
||||
awaitProxyClose(proxyClientSide, proxyServerSide);
|
||||
|
||||
CloseStatus closeStatus;
|
||||
Frame frame;
|
||||
|
||||
// Client2Proxy
|
||||
frame = proxyClientSide.receivedFrames.poll();
|
||||
assertThat(frame.getOpCode(), is(OpCode.TEXT));
|
||||
assertThat(frame.getPayloadAsUTF8(), is("hello world"));
|
||||
|
||||
// Server
|
||||
frame = serverFrameHandler.receivedFrames.poll();
|
||||
assertThat(frame.getOpCode(), is(OpCode.TEXT));
|
||||
assertThat(frame.getPayloadAsUTF8(), is("hello world"));
|
||||
frame = serverFrameHandler.receivedFrames.poll();
|
||||
assertNull(frame);
|
||||
|
||||
// Server2Proxy
|
||||
frame = proxyServerSide.receivedFrames.poll();
|
||||
closeStatus = CloseStatus.getCloseStatus(frame);
|
||||
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
|
||||
assertThat(closeStatus.getReason(), is("intentionally throwing in server onFrame()"));
|
||||
|
||||
// Client
|
||||
frame = clientFrameHandler.receivedFrames.poll();
|
||||
closeStatus = CloseStatus.getCloseStatus(frame);
|
||||
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
|
||||
assertThat(closeStatus.getReason(), is("intentionally throwing in server onFrame()"));
|
||||
|
||||
// Client2Proxy receiving close response from Client
|
||||
frame = proxyClientSide.receivedFrames.poll();
|
||||
closeStatus = CloseStatus.getCloseStatus(frame);
|
||||
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
|
||||
assertThat(closeStatus.getReason(), is("intentionally throwing in server onFrame()"));
|
||||
|
||||
// Check Proxy is in expected final state
|
||||
assertNull(proxyClientSide.receivedFrames.poll());
|
||||
assertNull(proxyServerSide.receivedFrames.poll());
|
||||
assertThat(proxyClientSide.getState(), is(WebSocketProxy.State.CLOSED));
|
||||
assertThat(proxyServerSide.getState(), is(WebSocketProxy.State.CLOSED));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testServerErrorClientNoResponse() throws Exception
|
||||
{
|
||||
serverFrameHandler.throwOnFrame();
|
||||
WebSocketProxy.Client2Proxy proxyClientSide = proxy.client2Proxy;
|
||||
WebSocketProxy.Server2Proxy proxyServerSide = proxy.server2Proxy;
|
||||
|
||||
TestAsyncFrameHandler clientFrameHandler = new TestAsyncFrameHandler("CLIENT")
|
||||
{
|
||||
@Override
|
||||
public void onFrame(Frame frame, Callback callback)
|
||||
{
|
||||
LOG.info("[{}] onFrame {}", name, frame);
|
||||
receivedFrames.offer(Frame.copy(frame));
|
||||
}
|
||||
};
|
||||
|
||||
ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(_client, new URI("ws://localhost:8080/proxy/"), clientFrameHandler);
|
||||
upgradeRequest.setConfiguration(defaultCustomizer);
|
||||
CompletableFuture<CoreSession> response = _client.connect(upgradeRequest);
|
||||
response.get(5, TimeUnit.SECONDS);
|
||||
clientFrameHandler.sendText("hello world");
|
||||
assertTrue(clientFrameHandler.closeLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(serverFrameHandler.closeLatch.await(5, TimeUnit.SECONDS));
|
||||
awaitProxyClose(proxyClientSide, proxyServerSide);
|
||||
|
||||
CloseStatus closeStatus;
|
||||
Frame frame;
|
||||
|
||||
// Client2Proxy
|
||||
frame = proxyClientSide.receivedFrames.poll();
|
||||
assertThat(frame.getOpCode(), is(OpCode.TEXT));
|
||||
assertThat(frame.getPayloadAsUTF8(), is("hello world"));
|
||||
|
||||
// Server
|
||||
frame = serverFrameHandler.receivedFrames.poll();
|
||||
assertThat(frame.getOpCode(), is(OpCode.TEXT));
|
||||
assertThat(frame.getPayloadAsUTF8(), is("hello world"));
|
||||
assertNull(serverFrameHandler.receivedFrames.poll());
|
||||
|
||||
// Server2Proxy
|
||||
frame = proxyServerSide.receivedFrames.poll();
|
||||
closeStatus = CloseStatus.getCloseStatus(frame);
|
||||
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
|
||||
assertThat(closeStatus.getReason(), is("intentionally throwing in server onFrame()"));
|
||||
|
||||
// Client
|
||||
frame = clientFrameHandler.receivedFrames.poll();
|
||||
closeStatus = CloseStatus.getCloseStatus(frame);
|
||||
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
|
||||
assertThat(closeStatus.getReason(), is("intentionally throwing in server onFrame()"));
|
||||
assertNull(clientFrameHandler.receivedFrames.poll());
|
||||
|
||||
// Client2Proxy does NOT receive close response from the client and fails
|
||||
assertNull(proxyClientSide.receivedFrames.poll());
|
||||
assertThat(proxyClientSide.getState(), is(WebSocketProxy.State.FAILED));
|
||||
|
||||
// Server2Proxy is failed by the Client2Proxy
|
||||
assertNull(proxyServerSide.receivedFrames.poll());
|
||||
assertThat(proxyServerSide.getState(), is(WebSocketProxy.State.FAILED));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue