Issue #3494 - adjustments to WebSocketChannel processConnectionError

WebSocketChannel.processConnectionError now defaults to NO_CLOSE
status if no protocol reasons can be found

added some debug logging

improvements to tests

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2019-04-05 12:18:09 +11:00
parent 7df8140137
commit 6cd5fb42e9
4 changed files with 102 additions and 80 deletions

View File

@ -179,7 +179,7 @@ public class WebSocketStatsTest
ClientSocket socket = new ClientSocket();
CompletableFuture<Session> connect = client.connect(socket, uri);
final long numMessages = 10000;
final long numMessages = 1000;
final String msgText = "hello world";
long upgradeSentBytes;

View File

@ -328,25 +328,33 @@ public class ClientCloseTest
assertThat(serverEndpoints.size(), is(sessionCount));
// block all the server threads
for (int i = 0; i < sessionCount; i++)
clientSockets.get(i).getSession().getRemote().sendString("block");
try
{
// block all the server threads
for (int i = 0; i < sessionCount; i++)
clientSockets.get(i).getSession().getRemote().sendString("block");
assertTimeoutPreemptively(ofSeconds(5), () -> {
// client lifecycle stop (the meat of this test)
client.stop();
});
assertTimeoutPreemptively(ofSeconds(5), () ->
{
// client lifecycle stop (the meat of this test)
client.stop();
});
// clients disconnect
for (int i = 0; i < sessionCount; i++)
clientSockets.get(i).assertReceivedCloseEvent(2000, is(StatusCode.ABNORMAL), containsString("Channel Closed"));
// clients disconnect
for (int i = 0; i < sessionCount; i++)
clientSockets.get(i).assertReceivedCloseEvent(2000, is(StatusCode.ABNORMAL), containsString("Channel Closed"));
// ensure all Sessions are gone. connections are gone. etc. (client and server)
// ensure ConnectionListener onClose is called 3 times
clientSessionTracker.assertClosedProperly(client);
// ensure all Sessions are gone. connections are gone. etc. (client and server)
// ensure ConnectionListener onClose is called 3 times
clientSessionTracker.assertClosedProperly(client);
for (int i = 0; i < sessionCount; i++)
serverEndpoints.get(i).block.countDown();
assertThat(serverEndpoints.size(), is(sessionCount));
}
finally
{
for (int i = 0; i < sessionCount; i++)
serverEndpoints.get(i).block.countDown();
}
}
@Test
@ -367,28 +375,35 @@ public class ClientCloseTest
// client confirms connection via echo
confirmConnection(clientSocket, clientConnectFuture);
// Block on the server so that the server does not detect a read failure
clientSocket.getSession().getRemote().sendString("block");
try
{
// Block on the server so that the server does not detect a read failure
clientSocket.getSession().getRemote().sendString("block");
// setup client endpoint for write failure (test only)
EndPoint endp = clientSocket.getEndPoint();
endp.shutdownOutput();
// setup client endpoint for write failure (test only)
EndPoint endp = clientSocket.getEndPoint();
endp.shutdownOutput();
// client enqueue close frame
// should result in a client write failure
final String origCloseReason = "Normal Close from Client";
clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason);
// client enqueue close frame
// should result in a client write failure
final String origCloseReason = "Normal Close from Client";
clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason);
assertThat("OnError Latch", clientSocket.errorLatch.await(2, SECONDS), is(true));
assertThat("OnError", clientSocket.error.get(), instanceOf(EofException.class));
assertThat("OnError Latch", clientSocket.errorLatch.await(2, SECONDS), is(true));
assertThat("OnError", clientSocket.error.get(), instanceOf(EofException.class));
// client triggers close event on client ws-endpoint
// assert - close code==1006 (abnormal)
clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.ABNORMAL), null);
clientSessionTracker.assertClosedProperly(client);
// client triggers close event on client ws-endpoint
// assert - close code==1006 (abnormal)
clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.ABNORMAL), null);
clientSessionTracker.assertClosedProperly(client);
assertThat(serverEndpoints.size(), is(1));
serverEndpoints.get(0).block.countDown();
assertThat(serverEndpoints.size(), is(1));
}
finally
{
for (ServerEndpoint endpoint : serverEndpoints)
endpoint.block.countDown();
}
}
public static class ServerEndpoint implements WebSocketFrameListener, WebSocketListener

View File

@ -32,7 +32,6 @@ import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.Utf8Appendable;
@ -363,29 +362,6 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
callback.failed(e);
}
}
}
AbnormalCloseStatus abnormalCloseStatusFor(Throwable cause)
{
int code;
if (cause instanceof ProtocolException)
code = CloseStatus.PROTOCOL;
else if (cause instanceof CloseException)
code = ((CloseException)cause).getStatusCode();
else if (cause instanceof Utf8Appendable.NotUtf8Exception)
code = CloseStatus.BAD_PAYLOAD;
else if (cause instanceof WebSocketTimeoutException || cause instanceof TimeoutException || cause instanceof SocketTimeoutException)
code = CloseStatus.SHUTDOWN;
else if (cause instanceof EofException)
code = CloseStatus.NO_CLOSE;
else if (behavior == Behavior.CLIENT)
code = CloseStatus.POLICY_VIOLATION;
else
code = CloseStatus.SERVER_ERROR;
return new AbnormalCloseStatus(code, cause);
}
/**
@ -401,14 +377,24 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
if (LOG.isDebugEnabled())
LOG.debug("processConnectionError {} {}", this, cause);
CloseStatus closeStatus = abnormalCloseStatusFor(cause);
if (closeStatus.getCode() == CloseStatus.PROTOCOL)
close(closeStatus, callback);
else if (channelState.onClosed(closeStatus))
closeConnection(cause, closeStatus, callback);
int code;
if (cause instanceof CloseException)
code = ((CloseException)cause).getStatusCode();
else if (cause instanceof Utf8Appendable.NotUtf8Exception)
code = CloseStatus.BAD_PAYLOAD;
else if (cause instanceof WebSocketTimeoutException || cause instanceof TimeoutException || cause instanceof SocketTimeoutException)
code = CloseStatus.SHUTDOWN;
else
callback.failed(cause);
code = CloseStatus.NO_CLOSE;
AbnormalCloseStatus closeStatus = new AbnormalCloseStatus(code, cause);
if (CloseStatus.isTransmittableStatusCode(code))
close(closeStatus, callback);
else
{
if (channelState.onClosed(closeStatus))
closeConnection(cause, closeStatus, callback);
}
}
/**
@ -423,7 +409,19 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
if (LOG.isDebugEnabled())
LOG.debug("processHandlerError {} {}", this, cause);
close(abnormalCloseStatusFor(cause), callback);
int code;
if (cause instanceof CloseException)
code = ((CloseException)cause).getStatusCode();
else if (cause instanceof Utf8Appendable.NotUtf8Exception)
code = CloseStatus.BAD_PAYLOAD;
else if (cause instanceof WebSocketTimeoutException || cause instanceof TimeoutException || cause instanceof SocketTimeoutException)
code = CloseStatus.SHUTDOWN;
else if (behavior == Behavior.CLIENT)
code = CloseStatus.POLICY_VIOLATION;
else
code = CloseStatus.SERVER_ERROR;
close(new AbnormalCloseStatus(code, cause), callback);
}
/**
@ -515,6 +513,9 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
}
catch (Throwable t)
{
if (LOG.isDebugEnabled())
LOG.warn("Invalid outgoing frame: {}", frame);
callback.failed(t);
return;
}
@ -523,10 +524,10 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
{
synchronized(flusher)
{
boolean closeConnection = channelState.onOutgoingFrame(frame);
if (LOG.isDebugEnabled())
LOG.debug("sendFrame({}, {}, {}) {}", frame, callback, batch, closeConnection);
LOG.debug("sendFrame({}, {}, {})", frame, callback, batch);
boolean closeConnection = channelState.onOutgoingFrame(frame);
if (closeConnection)
{
Throwable cause = AbnormalCloseStatus.getCause(CloseStatus.getCloseStatus(frame));
@ -546,6 +547,9 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
}
catch (Throwable t)
{
if (LOG.isDebugEnabled())
LOG.debug("Failed sendFrame()", t);
if (frame.getOpCode() == OpCode.CLOSE)
{
CloseStatus closeStatus = CloseStatus.getCloseStatus(frame);

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.websocket.core;
import java.net.Socket;
import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -52,6 +53,7 @@ import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@ -315,21 +317,22 @@ public class WebSocketCloseTest extends WebSocketTester
client.close();
assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS));
while(true)
{
if (!server.isOpen())
break;
server.sendFrame(new Frame(OpCode.TEXT, BufferUtil.toBuffer("frame after close")), Callback.NOOP);
}
assertTimeoutPreemptively(Duration.ofSeconds(1), ()->{
while(true)
{
if (!server.isOpen())
break;
server.sendFrame(new Frame(OpCode.TEXT, BufferUtil.toBuffer("frame after close")), Callback.NOOP);
Thread.sleep(100);
}
});
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertNotNull(server.handler.error);
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.NO_CLOSE));
Callback callback = server.handler.receivedCallback.poll(5, TimeUnit.SECONDS);
callback.succeeded();
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
}
@ParameterizedTest
@ -433,7 +436,7 @@ public class WebSocketCloseTest extends WebSocketTester
@Override
public void onOpen(CoreSession coreSession)
{
LOG.info("onOpen {}", coreSession);
LOG.debug("onOpen {}", coreSession);
session = coreSession;
state = session.toString();
opened.countDown();
@ -442,7 +445,7 @@ public class WebSocketCloseTest extends WebSocketTester
@Override
public void onFrame(Frame frame, Callback callback)
{
LOG.info("onFrame: " + BufferUtil.toDetailString(frame.getPayload()));
LOG.debug("onFrame: " + BufferUtil.toDetailString(frame.getPayload()));
state = session.toString();
receivedCallback.offer(callback);
receivedFrames.offer(Frame.copy(frame));
@ -454,7 +457,7 @@ public class WebSocketCloseTest extends WebSocketTester
@Override
public void onClosed(CloseStatus closeStatus)
{
LOG.info("onClosed {}", closeStatus);
LOG.debug("onClosed {}", closeStatus);
state = session.toString();
this.closeStatus = closeStatus;
closed.countDown();
@ -463,7 +466,7 @@ public class WebSocketCloseTest extends WebSocketTester
@Override
public void onError(Throwable cause)
{
LOG.info("onError {} ", cause == null?null:cause.toString());
LOG.debug("onError {} ", cause);
error = cause;
state = session.toString();
}