Merge pull request #3521 from lachlan-roberts/jetty-10.0.x-3494-clientclosetest

Issue #3494 - flaky tests in jetty-websocket-tests ClientCloseTest
This commit is contained in:
Joakim Erdfelt 2019-04-10 14:14:24 -07:00 committed by GitHub
commit 578e5d5539
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 152 additions and 93 deletions

View File

@ -59,8 +59,7 @@ public class WebSocketAdapter implements WebSocketListener
@Override
public void onWebSocketClose(int statusCode, String reason)
{
this.session = null;
this.remote = null;
/* do nothing */
}
@Override

View File

@ -211,7 +211,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler
switch (frame.getOpCode())
{
case OpCode.CLOSE:
onCloseFrame(frame, demandingCallback);
onCloseFrame(frame, callback);
break;
case OpCode.PING:
onPingFrame(frame, demandingCallback);

View File

@ -179,7 +179,7 @@ public class WebSocketStatsTest
ClientSocket socket = new ClientSocket();
CompletableFuture<Session> connect = client.connect(socket, uri);
final long numMessages = 10000;
final long numMessages = 1000;
final String msgText = "hello world";
long upgradeSentBytes;

View File

@ -21,21 +21,23 @@ package org.eclipse.jetty.websocket.tests.client;
import java.io.IOException;
import java.net.URI;
import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.Frame;
@ -66,11 +68,13 @@ import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ClientCloseTest
{
private Server server;
private WebSocketClient client;
private BlockingArrayQueue<ServerEndpoint> serverEndpoints = new BlockingArrayQueue<>();
private Session confirmConnection(CloseTrackingEndpoint clientSocket, Future<Session> clientFuture) throws Exception
{
@ -127,7 +131,12 @@ public class ClientCloseTest
{
factory.setIdleTimeout(Duration.ofSeconds(10));
factory.setMaxTextMessageSize(1024 * 1024 * 2);
factory.register(ServerEndpoint.class);
factory.setCreator((req,resp)->
{
ServerEndpoint endpoint = new ServerEndpoint();
serverEndpoints.offer(endpoint);
return endpoint;
});
}
});
context.addServlet(holder, "/ws");
@ -228,7 +237,7 @@ public class ClientCloseTest
public void testRemoteDisconnect() throws Exception
{
// Set client timeout
final int clientTimeout = 1000;
final int clientTimeout = 3000;
client.setIdleTimeout(Duration.ofMillis(clientTimeout));
ClientOpenSessionTracker clientSessionTracker = new ClientOpenSessionTracker(1);
@ -239,20 +248,18 @@ public class ClientCloseTest
CloseTrackingEndpoint clientSocket = new CloseTrackingEndpoint();
Future<Session> clientConnectFuture = client.connect(clientSocket, wsUri);
try (Session ignored = confirmConnection(clientSocket, clientConnectFuture))
{
// client confirms connection via echo
// client confirms connection via echo
confirmConnection(clientSocket, clientConnectFuture);
// client sends close frame (triggering server connection abort)
final String origCloseReason = "abort";
clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason);
// client sends close frame (triggering server connection abort)
final String origCloseReason = "abort";
clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason);
// client reads -1 (EOF)
// client triggers close event on client ws-endpoint
clientSocket.assertReceivedCloseEvent(clientTimeout * 2,
is(StatusCode.ABNORMAL),
containsString("Channel Closed"));
}
// client reads -1 (EOF)
// client triggers close event on client ws-endpoint
clientSocket.assertReceivedCloseEvent(2000,
is(StatusCode.ABNORMAL),
containsString("Channel Closed"));
clientSessionTracker.assertClosedProperly(client);
}
@ -277,7 +284,7 @@ public class ClientCloseTest
// client confirms connection via echo
// client sends close frame
final String origCloseReason = "sleep|5000";
final String origCloseReason = "sleep|2500";
clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason);
// client close should occur
@ -297,7 +304,7 @@ public class ClientCloseTest
public void testStopLifecycle() throws Exception
{
// Set client timeout
final int timeout = 1000;
final int timeout = 3000;
client.setIdleTimeout(Duration.ofMillis(timeout));
int sessionCount = 3;
@ -319,20 +326,35 @@ public class ClientCloseTest
confirmConnection(clientSocket, clientConnectFuture);
}
assertTimeoutPreemptively(ofSeconds(5), () -> {
// client lifecycle stop (the meat of this test)
client.stop();
});
assertThat(serverEndpoints.size(), is(sessionCount));
// clients disconnect
for (int i = 0; i < sessionCount; i++)
try
{
clientSockets.get(i).assertReceivedCloseEvent(timeout, is(StatusCode.ABNORMAL), containsString("Channel Closed"));
}
// block all the server threads
for (int i = 0; i < sessionCount; i++)
clientSockets.get(i).getSession().getRemote().sendString("block");
// ensure all Sessions are gone. connections are gone. etc. (client and server)
// ensure ConnectionListener onClose is called 3 times
clientSessionTracker.assertClosedProperly(client);
assertTimeoutPreemptively(ofSeconds(5), () ->
{
// client lifecycle stop (the meat of this test)
client.stop();
});
// clients disconnect
for (int i = 0; i < sessionCount; i++)
clientSockets.get(i).assertReceivedCloseEvent(2000, is(StatusCode.ABNORMAL), containsString("Channel Closed"));
// ensure all Sessions are gone. connections are gone. etc. (client and server)
// ensure ConnectionListener onClose is called 3 times
clientSessionTracker.assertClosedProperly(client);
assertThat(serverEndpoints.size(), is(sessionCount));
}
finally
{
for (int i = 0; i < sessionCount; i++)
serverEndpoints.get(i).block.countDown();
}
}
@Test
@ -353,29 +375,42 @@ public class ClientCloseTest
// client confirms connection via echo
confirmConnection(clientSocket, clientConnectFuture);
// setup client endpoint for write failure (test only)
EndPoint endp = clientSocket.getEndPoint();
endp.shutdownOutput();
try
{
// Block on the server so that the server does not detect a read failure
clientSocket.getSession().getRemote().sendString("block");
// client enqueue close frame
// should result in a client write failure
final String origCloseReason = "Normal Close from Client";
clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason);
// setup client endpoint for write failure (test only)
EndPoint endp = clientSocket.getEndPoint();
endp.shutdownOutput();
assertThat("OnError Latch", clientSocket.errorLatch.await(2, SECONDS), is(true));
assertThat("OnError", clientSocket.error.get(), instanceOf(ClosedChannelException.class));
// client enqueue close frame
// should result in a client write failure
final String origCloseReason = "Normal Close from Client";
clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason);
// client triggers close event on client ws-endpoint
// assert - close code==1006 (abnormal)
clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.ABNORMAL), containsString("Channel Closed"));
assertThat("OnError Latch", clientSocket.errorLatch.await(2, SECONDS), is(true));
assertThat("OnError", clientSocket.error.get(), instanceOf(EofException.class));
clientSessionTracker.assertClosedProperly(client);
// client triggers close event on client ws-endpoint
// assert - close code==1006 (abnormal)
clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.ABNORMAL), null);
clientSessionTracker.assertClosedProperly(client);
assertThat(serverEndpoints.size(), is(1));
}
finally
{
for (ServerEndpoint endpoint : serverEndpoints)
endpoint.block.countDown();
}
}
public static class ServerEndpoint implements WebSocketFrameListener, WebSocketListener
{
private static final Logger LOG = Log.getLogger(ServerEndpoint.class);
private Session session;
CountDownLatch block = new CountDownLatch(1);
@Override
public void onWebSocketBinary(byte[] payload, int offset, int len)
@ -395,12 +430,23 @@ public class ClientCloseTest
String bigmsg = new String(buf, UTF_8);
session.getRemote().sendString(bigmsg);
}
else if (message.equals("block"))
{
LOG.debug("blocking");
assertTrue(block.await(5, TimeUnit.MINUTES));
LOG.debug("unblocked");
}
else
{
// simple echo
session.getRemote().sendString(message);
}
}
catch (InterruptedException e)
{
LOG.debug("unblocked");
throw new IllegalStateException(e);
}
catch (IOException ignore)
{
LOG.debug(ignore);
@ -422,9 +468,7 @@ public class ClientCloseTest
public void onWebSocketError(Throwable cause)
{
if (LOG.isDebugEnabled())
{
LOG.debug(cause);
}
LOG.debug("onWebSocketError(): ", cause);
}
@Override

View File

@ -306,12 +306,18 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
public void onEof()
{
if (LOG.isDebugEnabled())
LOG.debug("onEof() {}", this);
if (channelState.onEof())
closeConnection(new ClosedChannelException(), channelState.getCloseStatus(), Callback.NOOP);
}
public void closeConnection(Throwable cause, CloseStatus closeStatus, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("closeConnection() {} {} {}", closeStatus, this, cause);
connection.cancelDemand();
if (connection.getEndPoint().isOpen())
connection.close();
@ -356,27 +362,6 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
callback.failed(e);
}
}
}
AbnormalCloseStatus abnormalCloseStatusFor(Throwable cause)
{
int code;
if (cause instanceof ProtocolException)
code = CloseStatus.PROTOCOL;
else if (cause instanceof CloseException)
code = ((CloseException)cause).getStatusCode();
else if (cause instanceof Utf8Appendable.NotUtf8Exception)
code = CloseStatus.BAD_PAYLOAD;
else if (cause instanceof WebSocketTimeoutException || cause instanceof TimeoutException || cause instanceof SocketTimeoutException)
code = CloseStatus.SHUTDOWN;
else if (behavior == Behavior.CLIENT)
code = CloseStatus.POLICY_VIOLATION;
else
code = CloseStatus.SERVER_ERROR;
return new AbnormalCloseStatus(code, cause);
}
/**
@ -392,14 +377,24 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
if (LOG.isDebugEnabled())
LOG.debug("processConnectionError {} {}", this, cause);
CloseStatus closeStatus = abnormalCloseStatusFor(cause);
if (closeStatus.getCode() == CloseStatus.PROTOCOL)
close(closeStatus, callback);
else if (channelState.onClosed(closeStatus))
closeConnection(cause, closeStatus, callback);
int code;
if (cause instanceof CloseException)
code = ((CloseException)cause).getStatusCode();
else if (cause instanceof Utf8Appendable.NotUtf8Exception)
code = CloseStatus.BAD_PAYLOAD;
else if (cause instanceof WebSocketTimeoutException || cause instanceof TimeoutException || cause instanceof SocketTimeoutException)
code = CloseStatus.SHUTDOWN;
else
callback.failed(cause);
code = CloseStatus.NO_CLOSE;
AbnormalCloseStatus closeStatus = new AbnormalCloseStatus(code, cause);
if (CloseStatus.isTransmittableStatusCode(code))
close(closeStatus, callback);
else
{
if (channelState.onClosed(closeStatus))
closeConnection(cause, closeStatus, callback);
}
}
/**
@ -414,7 +409,19 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
if (LOG.isDebugEnabled())
LOG.debug("processHandlerError {} {}", this, cause);
close(abnormalCloseStatusFor(cause), callback);
int code;
if (cause instanceof CloseException)
code = ((CloseException)cause).getStatusCode();
else if (cause instanceof Utf8Appendable.NotUtf8Exception)
code = CloseStatus.BAD_PAYLOAD;
else if (cause instanceof WebSocketTimeoutException || cause instanceof TimeoutException || cause instanceof SocketTimeoutException)
code = CloseStatus.SHUTDOWN;
else if (behavior == Behavior.CLIENT)
code = CloseStatus.POLICY_VIOLATION;
else
code = CloseStatus.SERVER_ERROR;
close(new AbnormalCloseStatus(code, cause), callback);
}
/**
@ -506,6 +513,9 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
}
catch (Throwable t)
{
if (LOG.isDebugEnabled())
LOG.warn("Invalid outgoing frame: {}", frame);
callback.failed(t);
return;
}
@ -514,10 +524,10 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
{
synchronized(flusher)
{
boolean closeConnection = channelState.onOutgoingFrame(frame);
if (LOG.isDebugEnabled())
LOG.debug("sendFrame({}, {}, {}) {}", frame, callback, batch, closeConnection);
LOG.debug("sendFrame({}, {}, {})", frame, callback, batch);
boolean closeConnection = channelState.onOutgoingFrame(frame);
if (closeConnection)
{
Throwable cause = AbnormalCloseStatus.getCause(CloseStatus.getCloseStatus(frame));
@ -537,6 +547,9 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
}
catch (Throwable t)
{
if (LOG.isDebugEnabled())
LOG.debug("Failed sendFrame()", t);
if (frame.getOpCode() == OpCode.CLOSE)
{
CloseStatus closeStatus = CloseStatus.getCloseStatus(frame);

View File

@ -599,8 +599,8 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
@Override
public void onCompleteFailure(Throwable x)
{
super.onCompleteFailure(x);
channel.processConnectionError(x, NOOP);
super.onCompleteFailure(x);
}
}
}

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.websocket.core;
import java.net.Socket;
import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -52,6 +53,7 @@ import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@ -315,21 +317,22 @@ public class WebSocketCloseTest extends WebSocketTester
client.close();
assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS));
while(true)
{
if (!server.isOpen())
break;
server.sendFrame(new Frame(OpCode.TEXT, BufferUtil.toBuffer("frame after close")), Callback.NOOP);
}
assertTimeoutPreemptively(Duration.ofSeconds(1), ()->{
while(true)
{
if (!server.isOpen())
break;
server.sendFrame(new Frame(OpCode.TEXT, BufferUtil.toBuffer("frame after close")), Callback.NOOP);
Thread.sleep(100);
}
});
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertNotNull(server.handler.error);
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.NO_CLOSE));
Callback callback = server.handler.receivedCallback.poll(5, TimeUnit.SECONDS);
callback.succeeded();
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
}
@ParameterizedTest
@ -433,7 +436,7 @@ public class WebSocketCloseTest extends WebSocketTester
@Override
public void onOpen(CoreSession coreSession)
{
LOG.info("onOpen {}", coreSession);
LOG.debug("onOpen {}", coreSession);
session = coreSession;
state = session.toString();
opened.countDown();
@ -442,7 +445,7 @@ public class WebSocketCloseTest extends WebSocketTester
@Override
public void onFrame(Frame frame, Callback callback)
{
LOG.info("onFrame: " + BufferUtil.toDetailString(frame.getPayload()));
LOG.debug("onFrame: " + BufferUtil.toDetailString(frame.getPayload()));
state = session.toString();
receivedCallback.offer(callback);
receivedFrames.offer(Frame.copy(frame));
@ -454,7 +457,7 @@ public class WebSocketCloseTest extends WebSocketTester
@Override
public void onClosed(CloseStatus closeStatus)
{
LOG.info("onClosed {}", closeStatus);
LOG.debug("onClosed {}", closeStatus);
state = session.toString();
this.closeStatus = closeStatus;
closed.countDown();
@ -463,7 +466,7 @@ public class WebSocketCloseTest extends WebSocketTester
@Override
public void onError(Throwable cause)
{
LOG.info("onError {} ", cause == null?null:cause.toString());
LOG.debug("onError {} ", cause);
error = cause;
state = session.toString();
}