do not block for websocket PING and PONG messages

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2020-11-25 11:48:33 +11:00
parent b89adb8dae
commit 6a9acaaa9d
1 changed files with 21 additions and 81 deletions

View File

@ -23,10 +23,9 @@ import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
@ -75,66 +74,17 @@ public class WebSocketProxy
} }
} }
/**
* We use this to wait until we receive a pong from other websocket connection before sending back the response pong.
* This is problematic because the protocol allows unsolicited PongMessages. Ideally it would be best if we could
* disable the automatic pong response through something like the {@link org.eclipse.jetty.websocket.api.WebSocketPolicy}.
*/
private static class PongWait
{
private final FutureCallback COMPLETED = new FutureCallback(true);
private final AtomicReference<FutureCallback> reference = new AtomicReference<>();
/**
* @return gives back a Future which is completed when this is notified that a pong has been received.
*/
public FutureCallback waitForPong()
{
FutureCallback futureCallback = new FutureCallback();
if (!reference.compareAndSet(null, futureCallback))
throw new IllegalStateException();
return futureCallback;
}
/**
* @return true if the pong will be automatically forwarded, otherwise it must be sent manually.
*/
public boolean receivedPong()
{
FutureCallback futureCallback = reference.getAndSet(null);
if (futureCallback != null)
{
futureCallback.succeeded();
return true;
}
return false;
}
public void cancel()
{
FutureCallback futureCallback = reference.getAndSet(COMPLETED);
if (futureCallback != null)
futureCallback.cancel(true);
}
}
public class ClientToProxy implements WebSocketPartialListener, WebSocketPingPongListener public class ClientToProxy implements WebSocketPartialListener, WebSocketPingPongListener
{ {
private Session session; private Session session;
private final CountDownLatch closeLatch = new CountDownLatch(1); private final CountDownLatch closeLatch = new CountDownLatch(1);
private final PongWait pongWait = new PongWait(); private final AtomicInteger pingsReceived = new AtomicInteger();
public Session getSession() public Session getSession()
{ {
return session; return session;
} }
public boolean receivedPong()
{
return pongWait.receivedPong();
}
public void fail(Throwable failure) public void fail(Throwable failure)
{ {
session.close(StatusCode.SERVER_ERROR, failure.getMessage()); session.close(StatusCode.SERVER_ERROR, failure.getMessage());
@ -154,6 +104,8 @@ public class WebSocketProxy
upgradeRequest.setSubProtocols(session.getUpgradeRequest().getSubProtocols()); upgradeRequest.setSubProtocols(session.getUpgradeRequest().getSubProtocols());
upgradeRequest.setExtensions(session.getUpgradeRequest().getExtensions()); upgradeRequest.setExtensions(session.getUpgradeRequest().getExtensions());
connect = client.connect(proxyToServer, serverUri, upgradeRequest); connect = client.connect(proxyToServer, serverUri, upgradeRequest);
//This is blocking as we really want the client to be connected before receiving any messages.
connect.get(); connect.get();
} }
catch (Exception e) catch (Exception e)
@ -204,10 +156,9 @@ public class WebSocketProxy
try try
{ {
// Block until we get pong response back from server. An automatic pong will be sent after this method. // The implementation automatically sends pong response.
FutureCallback futureCallback = pongWait.waitForPong(); pingsReceived.incrementAndGet();
proxyToServer.getSession().getRemote().sendPing(BufferUtil.copy(payload)); proxyToServer.getSession().getRemote().sendPing(BufferUtil.copy(payload));
futureCallback.get();
} }
catch (Exception e) catch (Exception e)
{ {
@ -223,11 +174,11 @@ public class WebSocketProxy
try try
{ {
// We do not forward on the pong message unless it was an unsolicited pong. // If we have sent out a ping then we have already responded with automatic pong.
// Instead we notify the other side we have received pong which will then unblock in the // If this is an unsolicited pong we still need to forward it to the server.
// thread in onPing() which will trigger the automatic pong response from the implementation. int valueBeforeUpdate = pingsReceived.getAndUpdate(i -> i > 0 ? i - 1 : i);
if (!proxyToServer.receivedPong()) if (valueBeforeUpdate == 0)
proxyToServer.session.getRemote().sendPong(BufferUtil.copy(payload)); proxyToServer.getSession().getRemote().sendPong(BufferUtil.copy(payload));
} }
catch (Exception e) catch (Exception e)
{ {
@ -242,7 +193,6 @@ public class WebSocketProxy
LOG.debug("{} onWebSocketError()", getClass().getSimpleName(), cause); LOG.debug("{} onWebSocketError()", getClass().getSimpleName(), cause);
proxyToServer.fail(cause); proxyToServer.fail(cause);
pongWait.cancel();
} }
@Override @Override
@ -251,10 +201,10 @@ public class WebSocketProxy
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} onWebSocketClose({} {})", getClass().getSimpleName(), statusCode, reason); LOG.debug("{} onWebSocketClose({} {})", getClass().getSimpleName(), statusCode, reason);
// Session may be null if connection to the server failed.
Session session = proxyToServer.getSession(); Session session = proxyToServer.getSession();
if (session != null) if (session != null)
session.close(statusCode, reason); session.close(statusCode, reason);
pongWait.cancel();
closeLatch.countDown(); closeLatch.countDown();
} }
} }
@ -263,18 +213,13 @@ public class WebSocketProxy
{ {
private Session session; private Session session;
private final CountDownLatch closeLatch = new CountDownLatch(1); private final CountDownLatch closeLatch = new CountDownLatch(1);
private final PongWait pongWait = new PongWait(); private final AtomicInteger pingsReceived = new AtomicInteger();
public Session getSession() public Session getSession()
{ {
return session; return session;
} }
public boolean receivedPong()
{
return pongWait.receivedPong();
}
public void fail(Throwable failure) public void fail(Throwable failure)
{ {
// Only ProxyToServer can be failed before it is opened (if ClientToProxy fails before the connect completes). // Only ProxyToServer can be failed before it is opened (if ClientToProxy fails before the connect completes).
@ -331,10 +276,9 @@ public class WebSocketProxy
try try
{ {
// Block until we get pong response back from client. An automatic pong will be sent after this method. // The implementation automatically sends pong response.
FutureCallback futureCallback = pongWait.waitForPong(); pingsReceived.incrementAndGet();
clientToProxy.getSession().getRemote().sendPing(BufferUtil.copy(payload)); clientToProxy.getSession().getRemote().sendPing(BufferUtil.copy(payload));
futureCallback.get();
} }
catch (Exception e) catch (Exception e)
{ {
@ -350,11 +294,11 @@ public class WebSocketProxy
try try
{ {
// We do not forward on the pong message unless it was an unsolicited pong. // If we have sent out a ping then we have already responded with automatic pong.
// Instead we notify the other side we have received pong which will then unblock in the // If this is an unsolicited pong we still need to forward it to the client.
// thread in onPing() which will trigger the automatic pong response from the implementation. int valueBeforeUpdate = pingsReceived.getAndUpdate(i -> i > 0 ? i - 1 : i);
if (!clientToProxy.receivedPong()) if (valueBeforeUpdate == 0)
clientToProxy.session.getRemote().sendPong(BufferUtil.copy(payload)); clientToProxy.getSession().getRemote().sendPong(BufferUtil.copy(payload));
} }
catch (Exception e) catch (Exception e)
{ {
@ -369,7 +313,6 @@ public class WebSocketProxy
LOG.debug("{} onWebSocketError()", getClass().getSimpleName(), cause); LOG.debug("{} onWebSocketError()", getClass().getSimpleName(), cause);
clientToProxy.fail(cause); clientToProxy.fail(cause);
pongWait.cancel();
} }
@Override @Override
@ -378,10 +321,7 @@ public class WebSocketProxy
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} onWebSocketClose({} {})", getClass().getSimpleName(), statusCode, reason); LOG.debug("{} onWebSocketClose({} {})", getClass().getSimpleName(), statusCode, reason);
Session session = clientToProxy.getSession(); clientToProxy.getSession().close(statusCode, reason);
if (session != null)
session.close(statusCode, reason);
pongWait.cancel();
closeLatch.countDown(); closeLatch.countDown();
} }
} }