fix websocket proxy for jetty10 websocket api

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2020-11-26 21:05:06 +11:00
parent cb626e72bc
commit 4ec2c3eca2
3 changed files with 16 additions and 29 deletions

View File

@ -475,7 +475,8 @@ public class WebSocketCoreSession implements IncomingFrames, CoreSession, Dumpab
}, },
x -> x ->
{ {
LOG.warn("Error during OPEN", x); if (LOG.isDebugEnabled())
LOG.debug("Error during OPEN", x);
processHandlerError(new CloseException(CloseStatus.SERVER_ERROR, x), NOOP); processHandlerError(new CloseException(CloseStatus.SERVER_ERROR, x), NOOP);
}); });

View File

@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.StringUtil;
@ -78,7 +77,6 @@ public class WebSocketProxy
{ {
private volatile Session session; private volatile Session session;
private final CountDownLatch closeLatch = new CountDownLatch(1); private final CountDownLatch closeLatch = new CountDownLatch(1);
private final AtomicInteger pingsReceived = new AtomicInteger();
public Session getSession() public Session getSession()
{ {
@ -156,9 +154,7 @@ public class WebSocketProxy
try try
{ {
// The implementation automatically sends pong response. proxyToServer.getSession().getRemote().sendPing(payload);
pingsReceived.incrementAndGet();
proxyToServer.getSession().getRemote().sendPing(BufferUtil.copy(payload));
} }
catch (Exception e) catch (Exception e)
{ {
@ -174,11 +170,7 @@ public class WebSocketProxy
try try
{ {
// If we have sent out a ping then we have already responded with automatic pong. proxyToServer.getSession().getRemote().sendPong(payload);
// If this is an unsolicited pong we still need to forward it to the server.
int valueBeforeUpdate = proxyToServer.pingsReceived.getAndUpdate(i -> i > 0 ? i - 1 : i);
if (valueBeforeUpdate == 0)
proxyToServer.getSession().getRemote().sendPong(BufferUtil.copy(payload));
} }
catch (Exception e) catch (Exception e)
{ {
@ -213,7 +205,6 @@ public class WebSocketProxy
{ {
private volatile Session session; private volatile Session session;
private final CountDownLatch closeLatch = new CountDownLatch(1); private final CountDownLatch closeLatch = new CountDownLatch(1);
private final AtomicInteger pingsReceived = new AtomicInteger();
public Session getSession() public Session getSession()
{ {
@ -277,9 +268,7 @@ public class WebSocketProxy
try try
{ {
// The implementation automatically sends pong response. clientToProxy.getSession().getRemote().sendPing(payload);
pingsReceived.incrementAndGet();
clientToProxy.getSession().getRemote().sendPing(BufferUtil.copy(payload));
} }
catch (Exception e) catch (Exception e)
{ {
@ -295,11 +284,7 @@ public class WebSocketProxy
try try
{ {
// If we have sent out a ping then we have already responded with automatic pong. clientToProxy.getSession().getRemote().sendPong(payload);
// If this is an unsolicited pong we still need to forward it to the client.
int valueBeforeUpdate = clientToProxy.pingsReceived.getAndUpdate(i -> i > 0 ? i - 1 : i);
if (valueBeforeUpdate == 0)
clientToProxy.getSession().getRemote().sendPong(BufferUtil.copy(payload));
} }
catch (Exception e) catch (Exception e)
{ {

View File

@ -161,16 +161,17 @@ public class WebSocketProxyTest
client.connect(clientSocket, proxyUri); client.connect(clientSocket, proxyUri);
assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS)); assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS));
// TODO: Why is this server error when it is occurring on the client.
// Verify expected client close. // Verify expected client close.
assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS)); assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(clientSocket.closeCode, is(StatusCode.NO_CLOSE)); assertThat(clientSocket.closeCode, is(StatusCode.SERVER_ERROR));
assertThat(clientSocket.closeReason, is("simulated onOpen error")); assertThat(clientSocket.closeReason, containsString("simulated onOpen err"));
assertNotNull(clientSocket.error); assertNotNull(clientSocket.error);
// Verify expected server close. // Verify expected server close.
assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS)); assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(serverSocket.closeCode, is(StatusCode.NO_CLOSE)); assertThat(serverSocket.closeCode, is(StatusCode.SERVER_ERROR));
assertThat(serverSocket.closeReason, is("Disconnected")); assertThat(serverSocket.closeReason, containsString("simulated onOpen err"));
assertNull(serverSocket.error); assertNull(serverSocket.error);
// WebSocketProxy has been completely closed. // WebSocketProxy has been completely closed.
@ -189,13 +190,13 @@ public class WebSocketProxyTest
// Verify expected client close. // Verify expected client close.
assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS)); assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(clientSocket.closeCode, is(StatusCode.SERVER_ERROR)); assertThat(clientSocket.closeCode, is(StatusCode.SERVER_ERROR));
assertThat(clientSocket.closeReason, is("simulated onOpen error")); assertThat(clientSocket.closeReason, containsString("simulated onOpen err"));
assertNull(clientSocket.error); assertNull(clientSocket.error);
// Verify expected server close. // Verify expected server close.
assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS)); assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(serverSocket.closeCode, is(StatusCode.SERVER_ERROR)); assertThat(serverSocket.closeCode, is(StatusCode.SERVER_ERROR));
assertThat(serverSocket.closeReason, is("simulated onOpen error")); assertThat(serverSocket.closeReason, containsString("simulated onOpen err"));
assertNotNull(serverSocket.error); assertNotNull(serverSocket.error);
// WebSocketProxy has been completely closed. // WebSocketProxy has been completely closed.
@ -253,12 +254,12 @@ public class WebSocketProxyTest
assertTrue(serverSocket.closeLatch.await(clientSessionIdleTimeout * 2, TimeUnit.MILLISECONDS)); assertTrue(serverSocket.closeLatch.await(clientSessionIdleTimeout * 2, TimeUnit.MILLISECONDS));
// Check errors and close status. // Check errors and close status.
assertThat(clientSocket.error.getMessage(), containsString("Idle timeout expired")); assertThat(clientSocket.error.getMessage(), containsString("Connection Idle Timeout"));
assertThat(clientSocket.closeCode, is(StatusCode.SHUTDOWN)); assertThat(clientSocket.closeCode, is(StatusCode.SHUTDOWN));
assertThat(clientSocket.closeReason, containsString("Idle timeout expired")); assertThat(clientSocket.closeReason, containsString("Connection Idle Timeout"));
assertNull(serverSocket.error); assertNull(serverSocket.error);
assertThat(serverSocket.closeCode, is(StatusCode.SHUTDOWN)); assertThat(serverSocket.closeCode, is(StatusCode.SHUTDOWN));
assertThat(serverSocket.closeReason, containsString("Idle timeout expired")); assertThat(serverSocket.closeReason, containsString("Connection Idle Timeout"));
} }
@Test @Test