diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java b/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java index 584eb2d2c24..95bc9937e9d 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java @@ -135,6 +135,24 @@ public class BufferUtil return buf; } + /** + * Deep copy of a buffer + * + * @param buffer The buffer to copy + * @return A copy of the buffer + */ + public static ByteBuffer copy(ByteBuffer buffer) + { + if (buffer == null) + return null; + int p = buffer.position(); + ByteBuffer clone = buffer.isDirect() ? ByteBuffer.allocateDirect(buffer.remaining()) : ByteBuffer.allocate(buffer.remaining()); + clone.put(buffer); + clone.flip(); + buffer.position(p); + return clone; + } + /** * Clear the buffer to be empty in flush mode. * The position and limit are set to 0; diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxy.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxy.java index 3867c80e50a..027a912a30e 100644 --- a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxy.java +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxy.java @@ -1,28 +1,57 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + package org.eclipse.jetty.websocket.tests.proxy; import java.net.URI; import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.FutureCallback; -import org.eclipse.jetty.util.component.LifeCycle; +import org.eclipse.jetty.util.StringUtil; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.StatusCode; import org.eclipse.jetty.websocket.api.WebSocketConnectionListener; import org.eclipse.jetty.websocket.api.WebSocketException; import org.eclipse.jetty.websocket.api.WebSocketPartialListener; import org.eclipse.jetty.websocket.api.WebSocketPingPongListener; +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; import org.eclipse.jetty.websocket.client.WebSocketClient; public class WebSocketProxy { - private final WebSocketClient client = new WebSocketClient(); - private final URI serverUri = URI.create("ws://echo.websocket.org"); + private static final Logger LOG = Log.getLogger(WebSocketProxy.class); + + private final WebSocketClient client; + private final URI serverUri; private final ClientToProxy clientToProxy = new ClientToProxy(); private final ProxyToServer proxyToServer = new ProxyToServer(); - public WebSocketProxy() + public WebSocketProxy(WebSocketClient webSocketClient, URI serverUri) { - LifeCycle.start(client); + this.client = webSocketClient; + this.serverUri = serverUri; } public WebSocketConnectionListener getWebSocketConnectionListener() @@ -30,33 +59,101 @@ public class WebSocketProxy return clientToProxy; } + public boolean awaitClose(long timeout) + { + try + { + if (!clientToProxy.closeLatch.await(timeout, TimeUnit.MILLISECONDS)) + return false; + if (proxyToServer.getSession() == null) + return true; + return proxyToServer.closeLatch.await(timeout, TimeUnit.MILLISECONDS); + } + catch (Exception e) + { + return false; + } + } + + /** + * We use this to wait until we receive a pong from other websocket connection before sending back the response pong. + * This is problematic because the protocol allows unsolicited PongMessages. Ideally it would be best if we could + * disable the automatic pong response through something like the {@link org.eclipse.jetty.websocket.api.WebSocketPolicy}. + */ + private static class PongWait + { + private final FutureCallback COMPLETED = new FutureCallback(true); + private final AtomicReference reference = new AtomicReference<>(); + + /** + * @return gives back a Future which is completed when this is notified that a pong has been received. + */ + public FutureCallback waitForPong() + { + FutureCallback futureCallback = new FutureCallback(); + if (!reference.compareAndSet(null, futureCallback)) + throw new IllegalStateException(); + return futureCallback; + } + + /** + * @return true if the pong will be automatically forwarded, otherwise it must be sent manually. + */ + public boolean receivedPong() + { + FutureCallback futureCallback = reference.getAndSet(null); + if (futureCallback != null) + { + futureCallback.succeeded(); + return true; + } + + return false; + } + + public void cancel() + { + FutureCallback futureCallback = reference.getAndSet(COMPLETED); + if (futureCallback != null) + futureCallback.cancel(true); + } + } + public class ClientToProxy implements WebSocketPartialListener, WebSocketPingPongListener { private Session session; - private FutureCallback pongWait; + private final CountDownLatch closeLatch = new CountDownLatch(1); + private final PongWait pongWait = new PongWait(); public Session getSession() { return session; } - public void receivedPong() + public boolean receivedPong() { - if (pongWait != null) - { - pongWait.succeeded(); - pongWait = null; - } + return pongWait.receivedPong(); + } + + public void fail(Throwable failure) + { + session.close(StatusCode.SERVER_ERROR, failure.getMessage()); } @Override public void onWebSocketConnect(Session session) { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketConnect({})", getClass().getSimpleName(), session); + Future connect = null; try { this.session = session; - connect = client.connect(proxyToServer, serverUri); + ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest(); + upgradeRequest.setSubProtocols(session.getUpgradeRequest().getSubProtocols()); + upgradeRequest.setExtensions(session.getUpgradeRequest().getExtensions()); + connect = client.connect(proxyToServer, serverUri, upgradeRequest); connect.get(); } catch (Exception e) @@ -70,6 +167,9 @@ public class WebSocketProxy @Override public void onWebSocketPartialBinary(ByteBuffer payload, boolean fin) { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketPartialBinary({}, {})", getClass().getSimpleName(), BufferUtil.toDetailString(payload), fin); + try { proxyToServer.getSession().getRemote().sendPartialBytes(payload, fin); @@ -83,6 +183,9 @@ public class WebSocketProxy @Override public void onWebSocketPartialText(String payload, boolean fin) { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketPartialText({}, {})", getClass().getSimpleName(), StringUtil.truncate(payload, 100), fin); + try { proxyToServer.getSession().getRemote().sendPartialString(payload, fin); @@ -96,12 +199,15 @@ public class WebSocketProxy @Override public void onWebSocketPing(ByteBuffer payload) { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketPing({})", getClass().getSimpleName(), BufferUtil.toDetailString(payload)); + try { - proxyToServer.getSession().getRemote().sendPing(payload); - // Block until we get pong response back from server. - // An automatic pong will occur from the implementation after we exit from here. - pongWait.get(); + // Block until we get pong response back from server. An automatic pong will be sent after this method. + FutureCallback futureCallback = pongWait.waitForPong(); + proxyToServer.getSession().getRemote().sendPing(BufferUtil.copy(payload)); + futureCallback.get(); } catch (Exception e) { @@ -112,10 +218,16 @@ public class WebSocketProxy @Override public void onWebSocketPong(ByteBuffer payload) { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketPong({})", getClass().getSimpleName(), BufferUtil.toDetailString(payload)); + try { - // Notify the other side we have received a Pong. - proxyToServer.receivedPong(); + // We do not forward on the pong message unless it was an unsolicited pong. + // Instead we notify the other side we have received pong which will then unblock in the + // thread in onPing() which will trigger the automatic pong response from the implementation. + if (!proxyToServer.receivedPong()) + proxyToServer.session.getRemote().sendPong(BufferUtil.copy(payload)); } catch (Exception e) { @@ -126,64 +238,65 @@ public class WebSocketProxy @Override public void onWebSocketError(Throwable cause) { - cause.printStackTrace(); + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketError()", getClass().getSimpleName(), cause); - try - { - // TODO: need to fail ProxyToServer as well. - if (pongWait != null) - pongWait.cancel(true); - } - catch (Exception e) - { - throw new WebSocketException(e); - } + proxyToServer.fail(cause); + pongWait.cancel(); } @Override public void onWebSocketClose(int statusCode, String reason) { - try - { - Session session = proxyToServer.getSession(); - if (session != null) - session.close(statusCode, reason); - } - catch (Exception e) - { - throw new WebSocketException(e); - } + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketClose({} {})", getClass().getSimpleName(), statusCode, reason); + + Session session = proxyToServer.getSession(); + if (session != null) + session.close(statusCode, reason); + pongWait.cancel(); + closeLatch.countDown(); } } public class ProxyToServer implements WebSocketPartialListener, WebSocketPingPongListener { private Session session; - private FutureCallback pongWait; + private final CountDownLatch closeLatch = new CountDownLatch(1); + private final PongWait pongWait = new PongWait(); public Session getSession() { return session; } - public void receivedPong() + public boolean receivedPong() { - if (pongWait != null) - { - pongWait.succeeded(); - pongWait = null; - } + return pongWait.receivedPong(); + } + + public void fail(Throwable failure) + { + // Only ProxyToServer can be failed before it is opened (if ClientToProxy fails before the connect completes). + if (session != null) + session.close(StatusCode.SERVER_ERROR, failure.getMessage()); } @Override public void onWebSocketConnect(Session session) { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketConnect({})", getClass().getSimpleName(), session); + this.session = session; } @Override public void onWebSocketPartialBinary(ByteBuffer payload, boolean fin) { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketPartialBinary({}, {})", getClass().getSimpleName(), BufferUtil.toDetailString(payload), fin); + try { clientToProxy.getSession().getRemote().sendPartialBytes(payload, fin); @@ -197,6 +310,9 @@ public class WebSocketProxy @Override public void onWebSocketPartialText(String payload, boolean fin) { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketPartialText({}, {})", getClass().getSimpleName(), StringUtil.truncate(payload, 100), fin); + try { clientToProxy.getSession().getRemote().sendPartialString(payload, fin); @@ -210,12 +326,15 @@ public class WebSocketProxy @Override public void onWebSocketPing(ByteBuffer payload) { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketPing({})", getClass().getSimpleName(), BufferUtil.toDetailString(payload)); + try { - clientToProxy.getSession().getRemote().sendPing(payload); - // Block until we get pong response back from client. - // An automatic pong will occur from the implementation after we exit from here. - pongWait.get(); + // Block until we get pong response back from client. An automatic pong will be sent after this method. + FutureCallback futureCallback = pongWait.waitForPong(); + clientToProxy.getSession().getRemote().sendPing(BufferUtil.copy(payload)); + futureCallback.get(); } catch (Exception e) { @@ -226,10 +345,16 @@ public class WebSocketProxy @Override public void onWebSocketPong(ByteBuffer payload) { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketPong({})", getClass().getSimpleName(), BufferUtil.toDetailString(payload)); + try { - // Notify the other side we have received a Pong. - clientToProxy.receivedPong(); + // We do not forward on the pong message unless it was an unsolicited pong. + // Instead we notify the other side we have received pong which will then unblock in the + // thread in onPing() which will trigger the automatic pong response from the implementation. + if (!clientToProxy.receivedPong()) + clientToProxy.session.getRemote().sendPong(BufferUtil.copy(payload)); } catch (Exception e) { @@ -240,33 +365,24 @@ public class WebSocketProxy @Override public void onWebSocketError(Throwable cause) { - cause.printStackTrace(); + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketError()", getClass().getSimpleName(), cause); - try - { - // TODO: need to fail ProxyToServer as well. - if (pongWait != null) - pongWait.cancel(true); - } - catch (Exception e) - { - throw new WebSocketException(e); - } + clientToProxy.fail(cause); + pongWait.cancel(); } @Override public void onWebSocketClose(int statusCode, String reason) { - try - { - Session session = clientToProxy.getSession(); - if (session != null) - session.close(statusCode, reason); - } - catch (Exception e) - { - throw new WebSocketException(e); - } + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketClose({} {})", getClass().getSimpleName(), statusCode, reason); + + Session session = clientToProxy.getSession(); + if (session != null) + session.close(statusCode, reason); + pongWait.cancel(); + closeLatch.countDown(); } } } diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxyTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxyTest.java index f1638149477..fce4655a305 100644 --- a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxyTest.java +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxyTest.java @@ -1,50 +1,338 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + package org.eclipse.jetty.websocket.tests.proxy; +import java.io.IOException; import java.net.URI; +import java.nio.ByteBuffer; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import org.eclipse.jetty.server.HttpChannel; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.util.BlockingArrayQueue; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.log.StacklessLogging; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.StatusCode; +import org.eclipse.jetty.websocket.api.WebSocketException; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketFrame; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.eclipse.jetty.websocket.api.extensions.Frame; +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.eclipse.jetty.websocket.common.OpCode; import org.eclipse.jetty.websocket.server.NativeWebSocketServletContainerInitializer; import org.eclipse.jetty.websocket.server.WebSocketUpgradeFilter; +import org.eclipse.jetty.websocket.tests.EchoSocket; +import org.eclipse.jetty.websocket.tests.EventSocket; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + public class WebSocketProxyTest { + private static final int PORT = 49998; + private Server server; - private URI serverUri; + private EventSocket serverSocket; + private WebSocketProxy webSocketProxy; + private WebSocketClient client; + private URI proxyUri; @BeforeEach public void before() throws Exception { server = new Server(); ServerConnector connector = new ServerConnector(server); - connector.setPort(8080); // TODO: remove... + connector.setPort(PORT); server.addConnector(connector); + client = new WebSocketClient(); + client.start(); + proxyUri = URI.create("ws://localhost:" + PORT + "/proxy"); + URI echoUri = URI.create("ws://localhost:" + PORT + "/echo"); + webSocketProxy = new WebSocketProxy(client, echoUri); + ServletContextHandler contextHandler = new ServletContextHandler(); WebSocketUpgradeFilter.configure(contextHandler); + serverSocket = new EchoSocket(); NativeWebSocketServletContainerInitializer.configure(contextHandler, ((context, container) -> { - container.addMapping("/*", (req, resp) -> new WebSocketProxy().getWebSocketConnectionListener()); + container.addMapping("/proxy", (req, resp) -> webSocketProxy.getWebSocketConnectionListener()); + container.addMapping("/echo", (req, resp) -> + { + if (req.hasSubProtocol("fail")) + throw new WebSocketException("failing during upgrade"); + return serverSocket; + }); })); server.setHandler(contextHandler); server.start(); - serverUri = URI.create("ws://localhost:" + connector.getLocalPort()); } @AfterEach public void after() throws Exception { + client.stop(); server.stop(); } @Test - public void test() throws Exception + public void testEcho() throws Exception { - server.join(); + EventSocket clientSocket = new EventSocket(); + client.connect(clientSocket, proxyUri); + assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS)); + + // Test an echo spread across multiple frames. + clientSocket.session.getRemote().sendPartialString("hell", false); + clientSocket.session.getRemote().sendPartialString("o w", false); + clientSocket.session.getRemote().sendPartialString("orld", false); + clientSocket.session.getRemote().sendPartialString("!", true); + String response = clientSocket.textMessages.poll(5, TimeUnit.SECONDS); + assertThat(response, is("hello world!")); + + // Test we closed successfully on the client side. + clientSocket.session.close(StatusCode.NORMAL, "test initiated close"); + assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(clientSocket.closeCode, is(StatusCode.NORMAL)); + assertThat(clientSocket.closeReason, is("test initiated close")); + + // Test we closed successfully on the server side. + assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(serverSocket.closeCode, is(StatusCode.NORMAL)); + assertThat(serverSocket.closeReason, is("test initiated close")); + + // No errors occurred. + assertNull(clientSocket.error); + assertNull(serverSocket.error); + + // WebSocketProxy has been completely closed. + assertTrue(webSocketProxy.awaitClose(5000)); + } + + @Test + public void testFailServerUpgrade() throws Exception + { + EventSocket clientSocket = new EventSocket(); + ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest(); + upgradeRequest.setSubProtocols("fail"); + + try (StacklessLogging ignored = new StacklessLogging(HttpChannel.class)) + { + client.connect(clientSocket, proxyUri, upgradeRequest); + assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS)); + } + + // WebSocketProxy has been completely closed. + assertTrue(webSocketProxy.awaitClose(5000)); + } + + @Test + public void testClientError() throws Exception + { + EventSocket clientSocket = new OnOpenThrowingSocket(); + client.connect(clientSocket, proxyUri); + assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS)); + + // Verify expected client close. + assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(clientSocket.closeCode, is(StatusCode.NO_CLOSE)); + assertThat(clientSocket.closeReason, is("simulated onOpen error")); + assertNotNull(clientSocket.error); + + // Verify expected server close. + assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(serverSocket.closeCode, is(StatusCode.NO_CLOSE)); + assertThat(serverSocket.closeReason, is("Disconnected")); + assertNull(serverSocket.error); + + // WebSocketProxy has been completely closed. + assertTrue(webSocketProxy.awaitClose(5000)); + } + + @Test + public void testServerError() throws Exception + { + serverSocket = new OnOpenThrowingSocket(); + + EventSocket clientSocket = new EventSocket(); + client.connect(clientSocket, proxyUri); + assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS)); + + // Verify expected client close. + assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(clientSocket.closeCode, is(StatusCode.SERVER_ERROR)); + assertThat(clientSocket.closeReason, is("simulated onOpen error")); + assertNull(clientSocket.error); + + // Verify expected server close. + assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(serverSocket.closeCode, is(StatusCode.SERVER_ERROR)); + assertThat(serverSocket.closeReason, is("simulated onOpen error")); + assertNotNull(serverSocket.error); + + // WebSocketProxy has been completely closed. + assertTrue(webSocketProxy.awaitClose(5000)); + } + + @Test + public void testServerErrorClientNoResponse() throws Exception + { + serverSocket = new OnTextThrowingSocket(); + + EventSocket clientSocket = new EventSocket(); + client.connect(clientSocket, proxyUri); + assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS)); + assertTrue(serverSocket.openLatch.await(5, TimeUnit.SECONDS)); + + clientSocket.session.getRemote().sendString("hello world!"); + + // Verify expected client close. + assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(clientSocket.closeCode, is(StatusCode.SERVER_ERROR)); + assertThat(clientSocket.closeReason, is("simulated onMessage error")); + assertNull(clientSocket.error); + + // Verify expected server close. + assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(serverSocket.closeCode, is(StatusCode.SERVER_ERROR)); + assertThat(serverSocket.closeReason, is("simulated onMessage error")); + assertNotNull(serverSocket.error); + + assertNull(clientSocket.textMessages.poll(1, TimeUnit.SECONDS)); + assertTrue(webSocketProxy.awaitClose(5000)); + } + + @Test + public void testPingPong() throws Exception + { + PingPongSocket serverEndpoint = new PingPongSocket(); + serverSocket = serverEndpoint; + + PingPongSocket clientSocket = new PingPongSocket(); + client.connect(clientSocket, proxyUri); + assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS)); + assertTrue(serverSocket.openLatch.await(5, TimeUnit.SECONDS)); + + // Test unsolicited pong from client. + clientSocket.session.getRemote().sendPong(BufferUtil.toBuffer("unsolicited pong from client")); + assertThat(serverEndpoint.pingMessages.size(), is(0)); + assertThat(serverEndpoint.pongMessages.poll(5, TimeUnit.SECONDS), is(BufferUtil.toBuffer("unsolicited pong from client"))); + + // Test unsolicited pong from server. + serverEndpoint.session.getRemote().sendPong(BufferUtil.toBuffer("unsolicited pong from server")); + assertThat(clientSocket.pingMessages.size(), is(0)); + assertThat(clientSocket.pongMessages.poll(5, TimeUnit.SECONDS), is(BufferUtil.toBuffer("unsolicited pong from server"))); + + // Test pings from client. + for (int i = 0; i < 10; i++) + clientSocket.session.getRemote().sendPing(BufferUtil.toBuffer(i)); + for (int i = 0; i < 10; i++) + { + assertThat(serverEndpoint.pingMessages.poll(5, TimeUnit.SECONDS), is(BufferUtil.toBuffer(i))); + assertThat(clientSocket.pongMessages.poll(5, TimeUnit.SECONDS), is(BufferUtil.toBuffer(i))); + } + + // Test pings from server. + for (int i = 0; i < 10; i++) + serverEndpoint.session.getRemote().sendPing(BufferUtil.toBuffer(i)); + for (int i = 0; i < 10; i++) + { + assertThat(clientSocket.pingMessages.poll(5, TimeUnit.SECONDS), is(BufferUtil.toBuffer(i))); + assertThat(serverEndpoint.pongMessages.poll(5, TimeUnit.SECONDS), is(BufferUtil.toBuffer(i))); + } + + clientSocket.session.close(StatusCode.NORMAL, "closing from test"); + + // Verify expected client close. + assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(clientSocket.closeCode, is(StatusCode.NORMAL)); + assertThat(clientSocket.closeReason, is("closing from test")); + assertNull(clientSocket.error); + + // Verify expected server close. + assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(serverSocket.closeCode, is(StatusCode.NORMAL)); + assertThat(serverSocket.closeReason, is("closing from test")); + assertNull(serverSocket.error); + + // WebSocketProxy has been completely closed. + assertTrue(webSocketProxy.awaitClose(5000)); + + // Check we had no unexpected pings or pongs sent. + assertThat(clientSocket.pingMessages.size(), is(0)); + assertThat(serverEndpoint.pingMessages.size(), is(0)); + } + + @WebSocket + public static class PingPongSocket extends EventSocket + { + public BlockingQueue pingMessages = new BlockingArrayQueue<>(); + public BlockingQueue pongMessages = new BlockingArrayQueue<>(); + + @OnWebSocketFrame + public void onWebSocketFrame(Frame frame) + { + switch (frame.getOpCode()) + { + case OpCode.PING: + pingMessages.add(BufferUtil.copy(frame.getPayload())); + break; + case OpCode.PONG: + pongMessages.add(BufferUtil.copy(frame.getPayload())); + break; + default: + break; + } + } + } + + @WebSocket + public static class OnOpenThrowingSocket extends EventSocket + { + @Override + public void onOpen(Session session) + { + super.onOpen(session); + throw new IllegalStateException("simulated onOpen error"); + } + } + + @WebSocket + public static class OnTextThrowingSocket extends EventSocket + { + @Override + public void onMessage(String message) throws IOException + { + super.onMessage(message); + throw new IllegalStateException("simulated onMessage error"); + } } } diff --git a/jetty-websocket/jetty-websocket-tests/src/test/resources/jetty-logging.properties b/jetty-websocket/jetty-websocket-tests/src/test/resources/jetty-logging.properties index a429c612f5e..8564d5228c0 100644 --- a/jetty-websocket/jetty-websocket-tests/src/test/resources/jetty-logging.properties +++ b/jetty-websocket/jetty-websocket-tests/src/test/resources/jetty-logging.properties @@ -12,6 +12,7 @@ org.eclipse.jetty.LEVEL=INFO # org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.LEVEL=DEBUG # org.eclipse.jetty.websocket.common.io.IOState.LEVEL=DEBUG # org.eclipse.jetty.websocket.common.test.LEVEL=DEBUG +# org.eclipse.jetty.websocket.tests.proxy.LEVEL=DEBUG # org.eclipse.jetty.websocket.common.Generator.LEVEL=DEBUG # org.eclipse.jetty.websocket.common.Parser.LEVEL=DEBUG