Merge remote-tracking branch 'origin/jetty-9.4.x' into jetty-10.0.x

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2020-11-26 17:20:05 +11:00
commit 741ad918e4
3 changed files with 699 additions and 0 deletions

View File

@ -10,6 +10,7 @@
# org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.io.IOState.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.test.LEVEL=DEBUG
# org.eclipse.jetty.websocket.tests.proxy.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.Generator.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.Parser.LEVEL=DEBUG
# org.eclipse.jetty.websocket.client.TrackingSocket.LEVEL=DEBUG

View File

@ -0,0 +1,329 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.websocket.tests.proxy;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketConnectionListener;
import org.eclipse.jetty.websocket.api.WebSocketPartialListener;
import org.eclipse.jetty.websocket.api.WebSocketPingPongListener;
import org.eclipse.jetty.websocket.api.exceptions.WebSocketException;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
public class WebSocketProxy
{
private static final Logger LOG = Log.getLogger(WebSocketProxy.class);
private final WebSocketClient client;
private final URI serverUri;
private final ClientToProxy clientToProxy = new ClientToProxy();
private final ProxyToServer proxyToServer = new ProxyToServer();
public WebSocketProxy(WebSocketClient webSocketClient, URI serverUri)
{
this.client = webSocketClient;
this.serverUri = serverUri;
}
public WebSocketConnectionListener getWebSocketConnectionListener()
{
return clientToProxy;
}
public boolean awaitClose(long timeout)
{
try
{
if (!clientToProxy.closeLatch.await(timeout, TimeUnit.MILLISECONDS))
return false;
if (proxyToServer.getSession() == null)
return true;
return proxyToServer.closeLatch.await(timeout, TimeUnit.MILLISECONDS);
}
catch (Exception e)
{
return false;
}
}
public class ClientToProxy implements WebSocketPartialListener, WebSocketPingPongListener
{
private volatile Session session;
private final CountDownLatch closeLatch = new CountDownLatch(1);
private final AtomicInteger pingsReceived = new AtomicInteger();
public Session getSession()
{
return session;
}
public void fail(Throwable failure)
{
session.close(StatusCode.SERVER_ERROR, failure.getMessage());
}
@Override
public void onWebSocketConnect(Session session)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onWebSocketConnect({})", getClass().getSimpleName(), session);
Future<Session> connect = null;
try
{
this.session = session;
ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest();
upgradeRequest.setSubProtocols(session.getUpgradeRequest().getSubProtocols());
upgradeRequest.setExtensions(session.getUpgradeRequest().getExtensions());
connect = client.connect(proxyToServer, serverUri, upgradeRequest);
//This is blocking as we really want the client to be connected before receiving any messages.
connect.get();
}
catch (Exception e)
{
if (connect != null)
connect.cancel(true);
throw new WebSocketException(e);
}
}
@Override
public void onWebSocketPartialBinary(ByteBuffer payload, boolean fin)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onWebSocketPartialBinary({}, {})", getClass().getSimpleName(), BufferUtil.toDetailString(payload), fin);
try
{
proxyToServer.getSession().getRemote().sendPartialBytes(payload, fin);
}
catch (Exception e)
{
throw new WebSocketException(e);
}
}
@Override
public void onWebSocketPartialText(String payload, boolean fin)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onWebSocketPartialText({}, {})", getClass().getSimpleName(), StringUtil.truncate(payload, 100), fin);
try
{
proxyToServer.getSession().getRemote().sendPartialString(payload, fin);
}
catch (Exception e)
{
throw new WebSocketException(e);
}
}
@Override
public void onWebSocketPing(ByteBuffer payload)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onWebSocketPing({})", getClass().getSimpleName(), BufferUtil.toDetailString(payload));
try
{
// The implementation automatically sends pong response.
pingsReceived.incrementAndGet();
proxyToServer.getSession().getRemote().sendPing(BufferUtil.copy(payload));
}
catch (Exception e)
{
throw new WebSocketException(e);
}
}
@Override
public void onWebSocketPong(ByteBuffer payload)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onWebSocketPong({})", getClass().getSimpleName(), BufferUtil.toDetailString(payload));
try
{
// If we have sent out a ping then we have already responded with automatic pong.
// If this is an unsolicited pong we still need to forward it to the server.
int valueBeforeUpdate = pingsReceived.getAndUpdate(i -> i > 0 ? i - 1 : i);
if (valueBeforeUpdate == 0)
proxyToServer.getSession().getRemote().sendPong(BufferUtil.copy(payload));
}
catch (Exception e)
{
throw new WebSocketException(e);
}
}
@Override
public void onWebSocketError(Throwable cause)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onWebSocketError()", getClass().getSimpleName(), cause);
proxyToServer.fail(cause);
}
@Override
public void onWebSocketClose(int statusCode, String reason)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onWebSocketClose({} {})", getClass().getSimpleName(), statusCode, reason);
// Session may be null if connection to the server failed.
Session session = proxyToServer.getSession();
if (session != null)
session.close(statusCode, reason);
closeLatch.countDown();
}
}
public class ProxyToServer implements WebSocketPartialListener, WebSocketPingPongListener
{
private volatile Session session;
private final CountDownLatch closeLatch = new CountDownLatch(1);
private final AtomicInteger pingsReceived = new AtomicInteger();
public Session getSession()
{
return session;
}
public void fail(Throwable failure)
{
// Only ProxyToServer can be failed before it is opened (if ClientToProxy fails before the connect completes).
Session session = this.session;
if (session != null)
session.close(StatusCode.SERVER_ERROR, failure.getMessage());
}
@Override
public void onWebSocketConnect(Session session)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onWebSocketConnect({})", getClass().getSimpleName(), session);
this.session = session;
}
@Override
public void onWebSocketPartialBinary(ByteBuffer payload, boolean fin)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onWebSocketPartialBinary({}, {})", getClass().getSimpleName(), BufferUtil.toDetailString(payload), fin);
try
{
clientToProxy.getSession().getRemote().sendPartialBytes(payload, fin);
}
catch (Exception e)
{
throw new WebSocketException(e);
}
}
@Override
public void onWebSocketPartialText(String payload, boolean fin)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onWebSocketPartialText({}, {})", getClass().getSimpleName(), StringUtil.truncate(payload, 100), fin);
try
{
clientToProxy.getSession().getRemote().sendPartialString(payload, fin);
}
catch (Exception e)
{
throw new WebSocketException(e);
}
}
@Override
public void onWebSocketPing(ByteBuffer payload)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onWebSocketPing({})", getClass().getSimpleName(), BufferUtil.toDetailString(payload));
try
{
// The implementation automatically sends pong response.
pingsReceived.incrementAndGet();
clientToProxy.getSession().getRemote().sendPing(BufferUtil.copy(payload));
}
catch (Exception e)
{
throw new WebSocketException(e);
}
}
@Override
public void onWebSocketPong(ByteBuffer payload)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onWebSocketPong({})", getClass().getSimpleName(), BufferUtil.toDetailString(payload));
try
{
// If we have sent out a ping then we have already responded with automatic pong.
// If this is an unsolicited pong we still need to forward it to the client.
int valueBeforeUpdate = pingsReceived.getAndUpdate(i -> i > 0 ? i - 1 : i);
if (valueBeforeUpdate == 0)
clientToProxy.getSession().getRemote().sendPong(BufferUtil.copy(payload));
}
catch (Exception e)
{
throw new WebSocketException(e);
}
}
@Override
public void onWebSocketError(Throwable cause)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onWebSocketError()", getClass().getSimpleName(), cause);
clientToProxy.fail(cause);
}
@Override
public void onWebSocketClose(int statusCode, String reason)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onWebSocketClose({} {})", getClass().getSimpleName(), statusCode, reason);
clientToProxy.getSession().close(statusCode, reason);
closeLatch.countDown();
}
}
}

View File

@ -0,0 +1,369 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.websocket.tests.proxy;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.logging.StacklessLogging;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.Frame;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketFrame;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.exceptions.WebSocketException;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.server.config.JettyWebSocketServletContainerInitializer;
import org.eclipse.jetty.websocket.tests.EchoSocket;
import org.eclipse.jetty.websocket.tests.EventSocket;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class WebSocketProxyTest
{
private static final int PORT = 49998;
private Server server;
private EventSocket serverSocket;
private WebSocketProxy webSocketProxy;
private WebSocketClient client;
private URI proxyUri;
@BeforeEach
public void before() throws Exception
{
server = new Server();
ServerConnector connector = new ServerConnector(server);
connector.setPort(PORT);
server.addConnector(connector);
client = new WebSocketClient();
client.start();
proxyUri = URI.create("ws://localhost:" + PORT + "/proxy");
URI echoUri = URI.create("ws://localhost:" + PORT + "/echo");
webSocketProxy = new WebSocketProxy(client, echoUri);
ServletContextHandler contextHandler = new ServletContextHandler();
serverSocket = new EchoSocket();
JettyWebSocketServletContainerInitializer.configure(contextHandler, ((context, container) ->
{
container.addMapping("/proxy", (req, resp) -> webSocketProxy.getWebSocketConnectionListener());
container.addMapping("/echo", (req, resp) ->
{
if (req.hasSubProtocol("fail"))
throw new WebSocketException("failing during upgrade");
return serverSocket;
});
}));
server.setHandler(contextHandler);
server.start();
}
@AfterEach
public void after() throws Exception
{
client.stop();
server.stop();
}
@Test
public void testEcho() throws Exception
{
EventSocket clientSocket = new EventSocket();
client.connect(clientSocket, proxyUri);
assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS));
// Test an echo spread across multiple frames.
clientSocket.session.getRemote().sendPartialString("hell", false);
clientSocket.session.getRemote().sendPartialString("o w", false);
clientSocket.session.getRemote().sendPartialString("orld", false);
clientSocket.session.getRemote().sendPartialString("!", true);
String response = clientSocket.textMessages.poll(5, TimeUnit.SECONDS);
assertThat(response, is("hello world!"));
// Test we closed successfully on the client side.
clientSocket.session.close(StatusCode.NORMAL, "test initiated close");
assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(clientSocket.closeCode, is(StatusCode.NORMAL));
assertThat(clientSocket.closeReason, is("test initiated close"));
// Test we closed successfully on the server side.
assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(serverSocket.closeCode, is(StatusCode.NORMAL));
assertThat(serverSocket.closeReason, is("test initiated close"));
// No errors occurred.
assertNull(clientSocket.error);
assertNull(serverSocket.error);
// WebSocketProxy has been completely closed.
assertTrue(webSocketProxy.awaitClose(5000));
}
@Test
public void testFailServerUpgrade() throws Exception
{
EventSocket clientSocket = new EventSocket();
ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest();
upgradeRequest.setSubProtocols("fail");
try (StacklessLogging ignored = new StacklessLogging(HttpChannel.class))
{
client.connect(clientSocket, proxyUri, upgradeRequest);
assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS));
}
// WebSocketProxy has been completely closed.
assertTrue(webSocketProxy.awaitClose(5000));
}
@Test
public void testClientError() throws Exception
{
EventSocket clientSocket = new OnOpenThrowingSocket();
client.connect(clientSocket, proxyUri);
assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS));
// Verify expected client close.
assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(clientSocket.closeCode, is(StatusCode.NO_CLOSE));
assertThat(clientSocket.closeReason, is("simulated onOpen error"));
assertNotNull(clientSocket.error);
// Verify expected server close.
assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(serverSocket.closeCode, is(StatusCode.NO_CLOSE));
assertThat(serverSocket.closeReason, is("Disconnected"));
assertNull(serverSocket.error);
// WebSocketProxy has been completely closed.
assertTrue(webSocketProxy.awaitClose(5000));
}
@Test
public void testServerError() throws Exception
{
serverSocket = new OnOpenThrowingSocket();
EventSocket clientSocket = new EventSocket();
client.connect(clientSocket, proxyUri);
assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS));
// Verify expected client close.
assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(clientSocket.closeCode, is(StatusCode.SERVER_ERROR));
assertThat(clientSocket.closeReason, is("simulated onOpen error"));
assertNull(clientSocket.error);
// Verify expected server close.
assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(serverSocket.closeCode, is(StatusCode.SERVER_ERROR));
assertThat(serverSocket.closeReason, is("simulated onOpen error"));
assertNotNull(serverSocket.error);
// WebSocketProxy has been completely closed.
assertTrue(webSocketProxy.awaitClose(5000));
}
@Test
public void testServerThrowsOnMessage() throws Exception
{
serverSocket = new OnTextThrowingSocket();
EventSocket clientSocket = new EventSocket();
client.connect(clientSocket, proxyUri);
assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverSocket.openLatch.await(5, TimeUnit.SECONDS));
clientSocket.session.getRemote().sendString("hello world!");
// Verify expected client close.
assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(clientSocket.closeCode, is(StatusCode.SERVER_ERROR));
assertThat(clientSocket.closeReason, is("simulated onMessage error"));
assertNull(clientSocket.error);
// Verify expected server close.
assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(serverSocket.closeCode, is(StatusCode.SERVER_ERROR));
assertThat(serverSocket.closeReason, is("simulated onMessage error"));
assertNotNull(serverSocket.error);
assertNull(clientSocket.textMessages.poll(1, TimeUnit.SECONDS));
assertTrue(webSocketProxy.awaitClose(5000));
}
@Test
public void timeoutTest() throws Exception
{
long clientSessionIdleTimeout = 2000;
EventSocket clientSocket = new EventSocket();
client.connect(clientSocket, proxyUri);
assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverSocket.openLatch.await(5, TimeUnit.SECONDS));
// Configure infinite idleTimeout on the server session and short timeout on the client session.
clientSocket.session.setIdleTimeout(Duration.ofMillis(clientSessionIdleTimeout));
serverSocket.session.setIdleTimeout(Duration.ZERO);
// Send and receive an echo message.
clientSocket.session.getRemote().sendString("test echo message");
assertThat(clientSocket.textMessages.poll(clientSessionIdleTimeout, TimeUnit.SECONDS), is("test echo message"));
// Wait more than the idleTimeout period, the clientToProxy connection should fail which should fail the proxyToServer.
assertTrue(clientSocket.closeLatch.await(clientSessionIdleTimeout * 2, TimeUnit.MILLISECONDS));
assertTrue(serverSocket.closeLatch.await(clientSessionIdleTimeout * 2, TimeUnit.MILLISECONDS));
// Check errors and close status.
assertThat(clientSocket.error.getMessage(), containsString("Idle timeout expired"));
assertThat(clientSocket.closeCode, is(StatusCode.SHUTDOWN));
assertThat(clientSocket.closeReason, containsString("Idle timeout expired"));
assertNull(serverSocket.error);
assertThat(serverSocket.closeCode, is(StatusCode.SHUTDOWN));
assertThat(serverSocket.closeReason, containsString("Idle timeout expired"));
}
@Test
public void testPingPong() throws Exception
{
PingPongSocket serverEndpoint = new PingPongSocket();
serverSocket = serverEndpoint;
PingPongSocket clientSocket = new PingPongSocket();
client.connect(clientSocket, proxyUri);
assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverSocket.openLatch.await(5, TimeUnit.SECONDS));
// Test unsolicited pong from client.
clientSocket.session.getRemote().sendPong(BufferUtil.toBuffer("unsolicited pong from client"));
assertThat(serverEndpoint.pingMessages.size(), is(0));
assertThat(serverEndpoint.pongMessages.poll(5, TimeUnit.SECONDS), is(BufferUtil.toBuffer("unsolicited pong from client")));
// Test unsolicited pong from server.
serverEndpoint.session.getRemote().sendPong(BufferUtil.toBuffer("unsolicited pong from server"));
assertThat(clientSocket.pingMessages.size(), is(0));
assertThat(clientSocket.pongMessages.poll(5, TimeUnit.SECONDS), is(BufferUtil.toBuffer("unsolicited pong from server")));
// Test pings from client.
for (int i = 0; i < 10; i++)
clientSocket.session.getRemote().sendPing(BufferUtil.toBuffer(i));
for (int i = 0; i < 10; i++)
{
assertThat(serverEndpoint.pingMessages.poll(5, TimeUnit.SECONDS), is(BufferUtil.toBuffer(i)));
assertThat(clientSocket.pongMessages.poll(5, TimeUnit.SECONDS), is(BufferUtil.toBuffer(i)));
}
// Test pings from server.
for (int i = 0; i < 10; i++)
serverEndpoint.session.getRemote().sendPing(BufferUtil.toBuffer(i));
for (int i = 0; i < 10; i++)
{
assertThat(clientSocket.pingMessages.poll(5, TimeUnit.SECONDS), is(BufferUtil.toBuffer(i)));
assertThat(serverEndpoint.pongMessages.poll(5, TimeUnit.SECONDS), is(BufferUtil.toBuffer(i)));
}
clientSocket.session.close(StatusCode.NORMAL, "closing from test");
// Verify expected client close.
assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(clientSocket.closeCode, is(StatusCode.NORMAL));
assertThat(clientSocket.closeReason, is("closing from test"));
assertNull(clientSocket.error);
// Verify expected server close.
assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(serverSocket.closeCode, is(StatusCode.NORMAL));
assertThat(serverSocket.closeReason, is("closing from test"));
assertNull(serverSocket.error);
// WebSocketProxy has been completely closed.
assertTrue(webSocketProxy.awaitClose(5000));
// Check we had no unexpected pings or pongs sent.
assertThat(clientSocket.pingMessages.size(), is(0));
assertThat(serverEndpoint.pingMessages.size(), is(0));
}
@WebSocket
public static class PingPongSocket extends EventSocket
{
public BlockingQueue<ByteBuffer> pingMessages = new BlockingArrayQueue<>();
public BlockingQueue<ByteBuffer> pongMessages = new BlockingArrayQueue<>();
@OnWebSocketFrame
public void onWebSocketFrame(Frame frame)
{
switch (frame.getOpCode())
{
case OpCode.PING:
pingMessages.add(BufferUtil.copy(frame.getPayload()));
break;
case OpCode.PONG:
pongMessages.add(BufferUtil.copy(frame.getPayload()));
break;
default:
break;
}
}
}
@WebSocket
public static class OnOpenThrowingSocket extends EventSocket
{
@Override
public void onOpen(Session session)
{
super.onOpen(session);
throw new IllegalStateException("simulated onOpen error");
}
}
@WebSocket
public static class OnTextThrowingSocket extends EventSocket
{
@Override
public void onMessage(String message) throws IOException
{
super.onMessage(message);
throw new IllegalStateException("simulated onMessage error");
}
}
}