diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxy.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxy.java index 027a912a30e..cfa6c85aaba 100644 --- a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxy.java +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxy.java @@ -23,10 +23,9 @@ import java.nio.ByteBuffer; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -75,66 +74,17 @@ public class WebSocketProxy } } - /** - * We use this to wait until we receive a pong from other websocket connection before sending back the response pong. - * This is problematic because the protocol allows unsolicited PongMessages. Ideally it would be best if we could - * disable the automatic pong response through something like the {@link org.eclipse.jetty.websocket.api.WebSocketPolicy}. - */ - private static class PongWait - { - private final FutureCallback COMPLETED = new FutureCallback(true); - private final AtomicReference reference = new AtomicReference<>(); - - /** - * @return gives back a Future which is completed when this is notified that a pong has been received. - */ - public FutureCallback waitForPong() - { - FutureCallback futureCallback = new FutureCallback(); - if (!reference.compareAndSet(null, futureCallback)) - throw new IllegalStateException(); - return futureCallback; - } - - /** - * @return true if the pong will be automatically forwarded, otherwise it must be sent manually. - */ - public boolean receivedPong() - { - FutureCallback futureCallback = reference.getAndSet(null); - if (futureCallback != null) - { - futureCallback.succeeded(); - return true; - } - - return false; - } - - public void cancel() - { - FutureCallback futureCallback = reference.getAndSet(COMPLETED); - if (futureCallback != null) - futureCallback.cancel(true); - } - } - public class ClientToProxy implements WebSocketPartialListener, WebSocketPingPongListener { private Session session; private final CountDownLatch closeLatch = new CountDownLatch(1); - private final PongWait pongWait = new PongWait(); + private final AtomicInteger pingsReceived = new AtomicInteger(); public Session getSession() { return session; } - public boolean receivedPong() - { - return pongWait.receivedPong(); - } - public void fail(Throwable failure) { session.close(StatusCode.SERVER_ERROR, failure.getMessage()); @@ -154,6 +104,8 @@ public class WebSocketProxy upgradeRequest.setSubProtocols(session.getUpgradeRequest().getSubProtocols()); upgradeRequest.setExtensions(session.getUpgradeRequest().getExtensions()); connect = client.connect(proxyToServer, serverUri, upgradeRequest); + + //This is blocking as we really want the client to be connected before receiving any messages. connect.get(); } catch (Exception e) @@ -204,10 +156,9 @@ public class WebSocketProxy try { - // Block until we get pong response back from server. An automatic pong will be sent after this method. - FutureCallback futureCallback = pongWait.waitForPong(); + // The implementation automatically sends pong response. + pingsReceived.incrementAndGet(); proxyToServer.getSession().getRemote().sendPing(BufferUtil.copy(payload)); - futureCallback.get(); } catch (Exception e) { @@ -223,11 +174,11 @@ public class WebSocketProxy try { - // We do not forward on the pong message unless it was an unsolicited pong. - // Instead we notify the other side we have received pong which will then unblock in the - // thread in onPing() which will trigger the automatic pong response from the implementation. - if (!proxyToServer.receivedPong()) - proxyToServer.session.getRemote().sendPong(BufferUtil.copy(payload)); + // If we have sent out a ping then we have already responded with automatic pong. + // If this is an unsolicited pong we still need to forward it to the server. + int valueBeforeUpdate = pingsReceived.getAndUpdate(i -> i > 0 ? i - 1 : i); + if (valueBeforeUpdate == 0) + proxyToServer.getSession().getRemote().sendPong(BufferUtil.copy(payload)); } catch (Exception e) { @@ -242,7 +193,6 @@ public class WebSocketProxy LOG.debug("{} onWebSocketError()", getClass().getSimpleName(), cause); proxyToServer.fail(cause); - pongWait.cancel(); } @Override @@ -251,10 +201,10 @@ public class WebSocketProxy if (LOG.isDebugEnabled()) LOG.debug("{} onWebSocketClose({} {})", getClass().getSimpleName(), statusCode, reason); + // Session may be null if connection to the server failed. Session session = proxyToServer.getSession(); if (session != null) session.close(statusCode, reason); - pongWait.cancel(); closeLatch.countDown(); } } @@ -263,18 +213,13 @@ public class WebSocketProxy { private Session session; private final CountDownLatch closeLatch = new CountDownLatch(1); - private final PongWait pongWait = new PongWait(); + private final AtomicInteger pingsReceived = new AtomicInteger(); public Session getSession() { return session; } - public boolean receivedPong() - { - return pongWait.receivedPong(); - } - public void fail(Throwable failure) { // Only ProxyToServer can be failed before it is opened (if ClientToProxy fails before the connect completes). @@ -331,10 +276,9 @@ public class WebSocketProxy try { - // Block until we get pong response back from client. An automatic pong will be sent after this method. - FutureCallback futureCallback = pongWait.waitForPong(); + // The implementation automatically sends pong response. + pingsReceived.incrementAndGet(); clientToProxy.getSession().getRemote().sendPing(BufferUtil.copy(payload)); - futureCallback.get(); } catch (Exception e) { @@ -350,11 +294,11 @@ public class WebSocketProxy try { - // We do not forward on the pong message unless it was an unsolicited pong. - // Instead we notify the other side we have received pong which will then unblock in the - // thread in onPing() which will trigger the automatic pong response from the implementation. - if (!clientToProxy.receivedPong()) - clientToProxy.session.getRemote().sendPong(BufferUtil.copy(payload)); + // If we have sent out a ping then we have already responded with automatic pong. + // If this is an unsolicited pong we still need to forward it to the client. + int valueBeforeUpdate = pingsReceived.getAndUpdate(i -> i > 0 ? i - 1 : i); + if (valueBeforeUpdate == 0) + clientToProxy.getSession().getRemote().sendPong(BufferUtil.copy(payload)); } catch (Exception e) { @@ -369,7 +313,6 @@ public class WebSocketProxy LOG.debug("{} onWebSocketError()", getClass().getSimpleName(), cause); clientToProxy.fail(cause); - pongWait.cancel(); } @Override @@ -378,10 +321,7 @@ public class WebSocketProxy if (LOG.isDebugEnabled()) LOG.debug("{} onWebSocketClose({} {})", getClass().getSimpleName(), statusCode, reason); - Session session = clientToProxy.getSession(); - if (session != null) - session.close(statusCode, reason); - pongWait.cancel(); + clientToProxy.getSession().close(statusCode, reason); closeLatch.countDown(); } }