Issue #3170 - Improved testing for WebSocketProxy

- Added test cases to test failures in and around the WebSocketProxy
and how it handles them.

- In WebSocketChannel.sendFrame() we were using a null cause for
closeConnection, we are now extracting the cause from the
AbnormalCloseStatus. This was resulting in onError not being called
when there was actually an error and an AbnormalCloseStatus.

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2019-02-19 09:27:11 +11:00
parent 001bc8f296
commit 4aa52c2f43
4 changed files with 369 additions and 43 deletions

View File

@ -524,7 +524,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
{
CloseStatus closeStatus = CloseStatus.getCloseStatus(frame);
if (closeStatus instanceof AbnormalCloseStatus && channelState.onClosed(closeStatus))
closeConnection(null, closeStatus, Callback.from(callback, ex));
closeConnection(AbnormalCloseStatus.getCause(closeStatus), closeStatus, Callback.from(callback, ex));
else
callback.failed(ex);
}

View File

@ -3,6 +3,7 @@ package org.eclipse.jetty.websocket.core.proxy;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.BufferUtil;
@ -16,6 +17,7 @@ class BasicFrameHandler implements FrameHandler
{
protected String name;
protected CoreSession session;
protected CountDownLatch opened = new CountDownLatch(1);
protected CountDownLatch closed = new CountDownLatch(1);
protected BlockingQueue<Frame> receivedFrames = new BlockingArrayQueue<>();
@ -30,8 +32,8 @@ class BasicFrameHandler implements FrameHandler
public void onOpen(CoreSession coreSession, Callback callback)
{
session = coreSession;
System.err.println(name + " onOpen(): " + session);
opened.countDown();
callback.succeeded();
}
@ -47,7 +49,6 @@ class BasicFrameHandler implements FrameHandler
public void onError(Throwable cause, Callback callback)
{
System.err.println(name + " onError(): " + cause);
cause.printStackTrace();
callback.succeeded();
}
@ -66,20 +67,40 @@ class BasicFrameHandler implements FrameHandler
session.sendFrame(textFrame, Callback.NOOP, false);
}
public void close(String message) throws InterruptedException
public void sendFrame(Frame frame)
{
System.err.println(name + " sendFrame(): " + frame);
session.sendFrame(frame, Callback.NOOP, false);
}
public void close(String message) throws Exception
{
session.close(CloseStatus.NORMAL, message, Callback.NOOP);
awaitClose();
}
public void awaitClose() throws InterruptedException
public void awaitClose() throws Exception
{
closed.await(5, TimeUnit.SECONDS);
if (!closed.await(5, TimeUnit.SECONDS))
throw new TimeoutException();
}
public static class ServerEchoHandler extends BasicFrameHandler
{
private boolean throwOnFrame;
private boolean noResponse;
public void throwOnFrame()
{
throwOnFrame = true;
}
public void noResponseOnFrame()
{
noResponse = true;
}
public ServerEchoHandler(String name)
{
super(name);
@ -91,6 +112,12 @@ class BasicFrameHandler implements FrameHandler
System.err.println(name + " onFrame(): " + frame);
receivedFrames.offer(Frame.copy(frame));
if (throwOnFrame)
throw new RuntimeException("intentionally throwing in server onFrame()");
if (noResponse)
return;
if (frame.isDataFrame())
{
System.err.println(name + " echoDataFrame(): " + frame);

View File

@ -3,6 +3,7 @@ package org.eclipse.jetty.websocket.core.proxy;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.Callback;
@ -38,7 +39,6 @@ class WebSocketProxy
this.client = client;
this.serverUri = serverUri;
}
class Client2Proxy implements FrameHandler
{
private CoreSession client;
@ -48,11 +48,20 @@ class WebSocketProxy
private Throwable error;
public BlockingQueue<Frame> receivedFrames = new BlockingArrayQueue<>();
protected CountDownLatch closed = new CountDownLatch(1);
public State getState()
{
synchronized (this)
{
return state;
}
}
@Override
public void onOpen(CoreSession session, Callback callback)
{
System.err.println("[Client2Proxy] onOpen: " + session);
System.err.println(toString() + " onOpen(): " + session);
Throwable failure = null;
synchronized (lock)
@ -78,6 +87,8 @@ class WebSocketProxy
private void onOpenSuccess(Callback callback)
{
System.err.println(toString() + " onOpenSuccess()");
boolean failServer2Proxy = false;
Throwable failure = null;
synchronized (lock)
@ -109,6 +120,8 @@ class WebSocketProxy
private void onOpenFail(Callback callback, Throwable t)
{
System.err.println(toString() + " onOpenFail(): " + t);
Throwable failure = t;
synchronized (lock)
{
@ -135,7 +148,7 @@ class WebSocketProxy
@Override
public void onFrame(Frame frame, Callback callback)
{
System.err.println("[Client2Proxy] onFrame(): " + frame);
System.err.println(toString() + " onFrame(): " + frame);
receivedFrames.offer(Frame.copy(frame));
Callback sendCallback = callback;
@ -177,8 +190,7 @@ class WebSocketProxy
@Override
public void onError(Throwable failure, Callback callback)
{
System.err.println("[Client2Proxy] onError(): " + failure);
failure.printStackTrace();
System.err.println(toString() + " onError(): " + failure);
boolean failServer2Proxy;
synchronized (lock)
@ -206,7 +218,7 @@ class WebSocketProxy
public void fail(Throwable failure, Callback callback)
{
System.err.println("[Client2Proxy] fail(): " + failure);
System.err.println(toString() + " fail(): " + failure);
Callback sendCallback = null;
synchronized (lock)
@ -239,13 +251,14 @@ class WebSocketProxy
@Override
public void onClosed(CloseStatus closeStatus, Callback callback)
{
System.err.println("[Client2Proxy] onClosed(): " + closeStatus);
System.err.println(toString() + " onClosed(): " + closeStatus);
closed.countDown();
callback.succeeded();
}
public void send(Frame frame, Callback callback)
{
System.err.println("[Client2Proxy] onClosed(): " + frame);
System.err.println(toString() + " send(): " + frame);
Callback sendCallback = callback;
Throwable failure = null;
@ -281,6 +294,15 @@ class WebSocketProxy
else
client.sendFrame(frame, sendCallback, false);
}
@Override
public String toString()
{
synchronized (lock)
{
return "[Client2Proxy," + state + "] ";
}
}
}
class Server2Proxy implements FrameHandler
@ -292,10 +314,19 @@ class WebSocketProxy
private Throwable error;
public BlockingQueue<Frame> receivedFrames = new BlockingArrayQueue<>();
protected CountDownLatch closed = new CountDownLatch(1);
public State getState()
{
synchronized (this)
{
return state;
}
}
public void connect(Callback callback)
{
System.err.println("[Server2Proxy] connect()");
System.err.println(toString() + " connect()");
Throwable failure = null;
synchronized (lock)
@ -338,6 +369,8 @@ class WebSocketProxy
private void onConnectSuccess(CoreSession s, Callback callback)
{
System.err.println(toString() + " onConnectSuccess(): " + s);
Callback sendCallback = null;
Throwable failure = null;
synchronized (lock)
@ -368,6 +401,8 @@ class WebSocketProxy
private void onConnectFailure(Throwable t, Callback callback)
{
System.err.println(toString() + " onConnectFailure(): " + t);
Throwable failure = t;
synchronized (lock)
{
@ -375,6 +410,7 @@ class WebSocketProxy
{
case CONNECTING:
state = State.FAILED;
error = t;
break;
case FAILED:
@ -392,7 +428,7 @@ class WebSocketProxy
@Override
public void onOpen(CoreSession session, Callback callback)
{
System.err.println("[Server2Proxy] onOpen(): " + session);
System.err.println(toString() + " onOpen(): " + session);
Throwable failure = null;
synchronized (lock)
@ -423,7 +459,7 @@ class WebSocketProxy
@Override
public void onFrame(Frame frame, Callback callback)
{
System.err.println("[Server2Proxy] onFrame(): " + frame);
System.err.println(toString() + " onFrame(): " + frame);
receivedFrames.offer(Frame.copy(frame));
Callback sendCallback = callback;
@ -466,8 +502,7 @@ class WebSocketProxy
@Override
public void onError(Throwable failure, Callback callback)
{
System.err.println("[Server2Proxy] onError(): " + failure);
failure.printStackTrace();
System.err.println(toString() + " onError(): " + failure);
boolean failClient2Proxy = false;
synchronized (lock)
@ -495,13 +530,14 @@ class WebSocketProxy
@Override
public void onClosed(CloseStatus closeStatus, Callback callback)
{
System.err.println("[Server2Proxy] onClosed(): " + closeStatus);
System.err.println(toString() + " onClosed(): " + closeStatus);
closed.countDown();
callback.succeeded();
}
public void fail(Throwable failure, Callback callback)
{
System.err.println("[Server2Proxy] fail(): " + failure);
System.err.println(toString() + " fail(): " + failure);
Callback sendCallback = null;
synchronized (lock)
@ -532,7 +568,7 @@ class WebSocketProxy
public void send(Frame frame, Callback callback)
{
System.err.println("[Server2Proxy] send(): " + frame);
System.err.println(toString() + " send(): " + frame);
Callback sendCallback = callback;
Throwable failure = null;
@ -568,5 +604,14 @@ class WebSocketProxy
else
server.sendFrame(frame, sendCallback, false);
}
@Override
public String toString()
{
synchronized (lock)
{
return "[Server2Proxy," + state + "] ";
}
}
}
}

View File

@ -1,18 +1,33 @@
package org.eclipse.jetty.websocket.core.proxy;
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.FrameHandler.CoreSession;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
import org.eclipse.jetty.websocket.core.internal.WebSocketChannel;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
import org.eclipse.jetty.websocket.core.server.WebSocketUpgradeHandler;
import org.junit.jupiter.api.AfterEach;
@ -20,16 +35,41 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
public class WebSocketProxyTest
{
Server _server;
WebSocketCoreClient _client;
private Server _server;
private WebSocketCoreClient _client;
private WebSocketProxy proxy;
private BasicFrameHandler.ServerEchoHandler serverFrameHandler;
private TestHandler testHandler;
WebSocketProxy proxy;
BasicFrameHandler.ServerEchoHandler serverFrameHandler;
private class TestHandler extends AbstractHandler
{
public void blockServerUpgradeRequests()
{
blockServerUpgradeRequests = true;
}
public boolean blockServerUpgradeRequests = false;
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
if (baseRequest.getHeader("Upgrade") != null)
{
if (blockServerUpgradeRequests && target.startsWith("/server/"))
{
response.sendError(HttpStatus.INTERNAL_SERVER_ERROR_500);
baseRequest.setHandled(true);
}
}
}
}
@BeforeEach
public void start() throws Exception
@ -40,21 +80,26 @@ public class WebSocketProxyTest
_server.addConnector(connector);
HandlerList handlers = new HandlerList();
testHandler = new TestHandler();
handlers.addHandler(testHandler);
FrameHandler.ConfigurationCustomizer customizer = new FrameHandler.ConfigurationCustomizer();
customizer.setIdleTimeout(Duration.ofSeconds(3));
ContextHandler serverContext = new ContextHandler("/server");
serverFrameHandler = new BasicFrameHandler.ServerEchoHandler("SERVER");
WebSocketNegotiator negotiator = WebSocketNegotiator.from((negotiation) -> serverFrameHandler);
WebSocketNegotiator negotiator = WebSocketNegotiator.from((negotiation) -> serverFrameHandler, customizer);
WebSocketUpgradeHandler upgradeHandler = new WebSocketUpgradeHandler(negotiator);
serverContext.setHandler(upgradeHandler);
handlers.addHandler(serverContext);
_client = new WebSocketCoreClient();
_client = new WebSocketCoreClient(null, customizer);
_client.start();
URI uri = new URI("ws://localhost:8080/server");
URI uri = new URI("ws://localhost:8080/server/");
ContextHandler proxyContext = new ContextHandler("/proxy");
proxy = new WebSocketProxy(_client, uri);
negotiator = WebSocketNegotiator.from((negotiation) -> proxy.client2Proxy);
negotiator = WebSocketNegotiator.from((negotiation) -> proxy.client2Proxy, customizer);
upgradeHandler = new WebSocketUpgradeHandler(negotiator);
proxyContext.setHandler(upgradeHandler);
handlers.addHandler(proxyContext);
@ -70,38 +115,247 @@ public class WebSocketProxyTest
_server.stop();
}
public void awaitProxyClose(WebSocketProxy.Client2Proxy client2Proxy, WebSocketProxy.Server2Proxy server2Proxy) throws Exception
{
if (client2Proxy != null && !client2Proxy.closed.await(5, TimeUnit.SECONDS))
{
throw new TimeoutException("client2Proxy close timeout");
}
if (server2Proxy != null && !server2Proxy.closed.await(5, TimeUnit.SECONDS))
{
throw new TimeoutException("server2Proxy close timeout");
}
}
@Test
public void testHello() throws Exception
public void testEcho() throws Exception
{
BasicFrameHandler clientHandler = new BasicFrameHandler("CLIENT");
ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(_client, new URI("ws://localhost:8080/proxy"), clientHandler);
WebSocketProxy.Client2Proxy proxyClientSide = proxy.client2Proxy;
WebSocketProxy.Server2Proxy proxyServerSide = proxy.server2Proxy;
ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(_client, new URI("ws://localhost:8080/proxy/a"), clientHandler);
CompletableFuture<CoreSession> response = _client.connect(upgradeRequest);
response.get(5, TimeUnit.SECONDS);
clientHandler.sendText("hello world");
clientHandler.close("standard close");
clientHandler.awaitClose();
serverFrameHandler.awaitClose();
awaitProxyClose(proxyClientSide, proxyServerSide);
Frame frame;
// Verify the the text frame was received
WebSocketProxy.Client2Proxy proxyClientSide = proxy.client2Proxy;
WebSocketProxy.Server2Proxy proxyServerSide = proxy.server2Proxy;
assertThat(proxyClientSide.getState(), is(WebSocketProxy.State.CLOSED));
assertThat(proxyServerSide.getState(), is(WebSocketProxy.State.CLOSED));
assertThat(proxyClientSide.receivedFrames.poll().getPayloadAsUTF8(), is("hello world"));
assertThat(serverFrameHandler.receivedFrames.poll().getPayloadAsUTF8(), is("hello world"));
assertThat(proxyServerSide.receivedFrames.poll().getPayloadAsUTF8(), is("hello world"));
assertThat(clientHandler.receivedFrames.poll().getPayloadAsUTF8(), is("hello world"));
// Verify the right close frame was received
assertThat(CloseStatus.getCloseStatus(proxyClientSide.receivedFrames.poll()).getReason(), is("standard close"));
assertThat(CloseStatus.getCloseStatus(serverFrameHandler.receivedFrames.poll()).getReason(), is("standard close"));
assertThat(CloseStatus.getCloseStatus(proxyServerSide.receivedFrames.poll()).getReason(), is("standard close"));
assertThat(CloseStatus.getCloseStatus(clientHandler.receivedFrames.poll()).getReason(), is("standard close"));
// Verify no other frames were received
assertNull(proxyClientSide.receivedFrames.poll(250, TimeUnit.MILLISECONDS));
assertNull(serverFrameHandler.receivedFrames.poll(250, TimeUnit.MILLISECONDS));
assertNull(proxyServerSide.receivedFrames.poll(250, TimeUnit.MILLISECONDS));
assertNull(clientHandler.receivedFrames.poll(250, TimeUnit.MILLISECONDS));
assertNull(proxyClientSide.receivedFrames.poll());
assertNull(serverFrameHandler.receivedFrames.poll());
assertNull(proxyServerSide.receivedFrames.poll());
assertNull(clientHandler.receivedFrames.poll());
}
@Test
public void testFailServerUpgrade() throws Exception
{
testHandler.blockServerUpgradeRequests();
WebSocketProxy.Client2Proxy proxyClientSide = proxy.client2Proxy;
WebSocketProxy.Server2Proxy proxyServerSide = proxy.server2Proxy;
BasicFrameHandler clientHandler = new BasicFrameHandler("CLIENT");
try (StacklessLogging stacklessLogging = new StacklessLogging(WebSocketChannel.class))
{
CompletableFuture<CoreSession> response = _client.connect(clientHandler, new URI("ws://localhost:8080/proxy/"));
response.get(5, TimeUnit.SECONDS);
clientHandler.sendText("hello world");
clientHandler.close("standard close");
clientHandler.awaitClose();
awaitProxyClose(proxyClientSide, null);
}
assertNull(proxyClientSide.receivedFrames.poll());
assertThat(proxyClientSide.getState(), is(WebSocketProxy.State.FAILED));
assertNull(proxyServerSide.receivedFrames.poll());
assertThat(proxyServerSide.getState(), is(WebSocketProxy.State.FAILED));
assertFalse(serverFrameHandler.opened.await(250, TimeUnit.MILLISECONDS));
CloseStatus closeStatus = CloseStatus.getCloseStatus(clientHandler.receivedFrames.poll());
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(closeStatus.getReason(), containsString("Failed to upgrade to websocket: Unexpected HTTP Response Status Code:"));
}
@Test
public void testClientError() throws Exception
{
BasicFrameHandler clientHandler = new BasicFrameHandler("CLIENT")
{
@Override
public void onOpen(CoreSession coreSession, Callback callback)
{
System.err.println(name + " onOpen(): " + coreSession);
throw new IllegalStateException("simulated client onOpen error");
}
};
WebSocketProxy.Client2Proxy proxyClientSide = proxy.client2Proxy;
WebSocketProxy.Server2Proxy proxyServerSide = proxy.server2Proxy;
try (StacklessLogging stacklessLogging = new StacklessLogging(WebSocketChannel.class))
{
CompletableFuture<CoreSession> response = _client.connect(clientHandler, new URI("ws://localhost:8080/proxy/"));
response.get(5, TimeUnit.SECONDS);
clientHandler.awaitClose();
serverFrameHandler.awaitClose();
awaitProxyClose(proxyClientSide, proxyServerSide);
}
CloseStatus closeStatus = CloseStatus.getCloseStatus(proxyClientSide.receivedFrames.poll());
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(closeStatus.getReason(), containsString("simulated client onOpen error"));
assertThat(proxyClientSide.getState(), is(WebSocketProxy.State.CLOSED));
closeStatus = CloseStatus.getCloseStatus(proxyServerSide.receivedFrames.poll());
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(closeStatus.getReason(), containsString("simulated client onOpen error"));
assertThat(proxyServerSide.getState(), is(WebSocketProxy.State.CLOSED));
closeStatus = CloseStatus.getCloseStatus(serverFrameHandler.receivedFrames.poll());
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(closeStatus.getReason(), containsString("simulated client onOpen error"));
assertNull(clientHandler.receivedFrames.poll());
}
@Test
public void testServerError() throws Exception
{
serverFrameHandler.throwOnFrame();
WebSocketProxy.Client2Proxy proxyClientSide = proxy.client2Proxy;
WebSocketProxy.Server2Proxy proxyServerSide = proxy.server2Proxy;
BasicFrameHandler clientHandler = new BasicFrameHandler("CLIENT");
ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(_client, new URI("ws://localhost:8080/proxy/test"), clientHandler);
CompletableFuture<CoreSession> response = _client.connect(upgradeRequest);
response.get(5, TimeUnit.SECONDS);
clientHandler.sendText("hello world");
clientHandler.awaitClose();
serverFrameHandler.awaitClose();
awaitProxyClose(proxyClientSide, proxyServerSide);
CloseStatus closeStatus;
Frame frame;
// Client
frame = clientHandler.receivedFrames.poll();
assertThat(CloseStatus.getCloseStatus(frame).getCode(), is(CloseStatus.SERVER_ERROR));
// Client2Proxy
frame = proxyClientSide.receivedFrames.poll();
assertThat(frame.getOpCode(), is(OpCode.TEXT));
assertThat(frame.getPayloadAsUTF8(), is("hello world"));
frame = proxyClientSide.receivedFrames.poll();
closeStatus = CloseStatus.getCloseStatus(frame);
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(closeStatus.getReason(), is("intentionally throwing in server onFrame()"));
frame = proxyClientSide.receivedFrames.poll();
assertNull(frame);
assertThat(proxyClientSide.getState(), is(WebSocketProxy.State.CLOSED));
// Server2Proxy
frame = proxyServerSide.receivedFrames.poll();
closeStatus = CloseStatus.getCloseStatus(frame);
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(closeStatus.getReason(), is("intentionally throwing in server onFrame()"));
frame = proxyServerSide.receivedFrames.poll();
assertNull(frame);
assertThat(proxyServerSide.getState(), is(WebSocketProxy.State.CLOSED));
// Server
frame = serverFrameHandler.receivedFrames.poll();
assertThat(frame.getOpCode(), is(OpCode.TEXT));
assertThat(frame.getPayloadAsUTF8(), is("hello world"));
frame = serverFrameHandler.receivedFrames.poll();
assertNull(frame);
}
@Test
public void testServerErrorClientNoResponse() throws Exception
{
serverFrameHandler.throwOnFrame();
WebSocketProxy.Client2Proxy proxyClientSide = proxy.client2Proxy;
WebSocketProxy.Server2Proxy proxyServerSide = proxy.server2Proxy;
BasicFrameHandler clientHandler = new BasicFrameHandler("CLIENT")
{
@Override
public void onFrame(Frame frame, Callback callback)
{
System.err.println(name + " onFrame(): " + frame);
receivedFrames.offer(Frame.copy(frame));
}
};
ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(_client, new URI("ws://localhost:8080/proxy/test"), clientHandler);
CompletableFuture<CoreSession> response = _client.connect(upgradeRequest);
response.get(5, TimeUnit.SECONDS);
clientHandler.sendText("hello world");
clientHandler.awaitClose();
serverFrameHandler.awaitClose();
awaitProxyClose(proxyClientSide, proxyServerSide);
CloseStatus closeStatus;
Frame frame;
// Client
frame = clientHandler.receivedFrames.poll();
closeStatus = CloseStatus.getCloseStatus(frame);
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(closeStatus.getReason(), is("intentionally throwing in server onFrame()"));
frame = clientHandler.receivedFrames.poll();
assertNull(frame);
// Client2Proxy
frame = proxyClientSide.receivedFrames.poll();
assertThat(frame.getOpCode(), is(OpCode.TEXT));
assertThat(frame.getPayloadAsUTF8(), is("hello world"));
frame = proxyClientSide.receivedFrames.poll();
assertNull(frame);
assertThat(proxyClientSide.getState(), is(WebSocketProxy.State.FAILED));
// Server2Proxy
frame = proxyServerSide.receivedFrames.poll();
closeStatus = CloseStatus.getCloseStatus(frame);
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(closeStatus.getReason(), is("intentionally throwing in server onFrame()"));
frame = proxyServerSide.receivedFrames.poll();
assertNull(frame);
assertThat(proxyServerSide.getState(), is(WebSocketProxy.State.FAILED));
// Server
frame = serverFrameHandler.receivedFrames.poll();
assertThat(frame.getOpCode(), is(OpCode.TEXT));
assertThat(frame.getPayloadAsUTF8(), is("hello world"));
frame = serverFrameHandler.receivedFrames.poll();
assertNull(frame);
}
}