Issue #3494 - fix ClientCloseTest.testWriteException()

Nulling out values in WebSocketAdapter causes race conditions when
trying to access session and endpoint externally

Race condition in WebSocketChannel.Flusher.onCompleteFailure(),
processConnectionError should be called first to ensure that the
correct close reason is processed, super.onCompleteFailure() was closing
the connection causing a read failure.

race condition between the server detecting a read failure and sending
a response and the client detecting the write failure, now blocking
on the server so it is not reading and will not detect the failure

Signed-off-by: lachan-roberts <lachlan@webtide.com>
This commit is contained in:
lachan-roberts 2019-04-01 11:19:23 +11:00
parent 717d7300ac
commit 50c193c23b
4 changed files with 51 additions and 24 deletions

View File

@ -59,8 +59,7 @@ public class WebSocketAdapter implements WebSocketListener
@Override @Override
public void onWebSocketClose(int statusCode, String reason) public void onWebSocketClose(int statusCode, String reason)
{ {
this.session = null; /* do nothing */
this.remote = null;
} }
@Override @Override

View File

@ -21,15 +21,16 @@ package org.eclipse.jetty.websocket.tests.client;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.nio.channels.ClosedChannelException;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.DefaultHandler; import org.eclipse.jetty.server.handler.DefaultHandler;
@ -66,10 +67,12 @@ import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ClientCloseTest public class ClientCloseTest
{ {
private Server server; private Server server;
private ServerEndpoint serverEndpoint = new ServerEndpoint();
private WebSocketClient client; private WebSocketClient client;
private Session confirmConnection(CloseTrackingEndpoint clientSocket, Future<Session> clientFuture) throws Exception private Session confirmConnection(CloseTrackingEndpoint clientSocket, Future<Session> clientFuture) throws Exception
@ -125,9 +128,9 @@ public class ClientCloseTest
@Override @Override
public void configure(JettyWebSocketServletFactory factory) public void configure(JettyWebSocketServletFactory factory)
{ {
factory.setIdleTimeout(Duration.ofSeconds(10)); factory.setIdleTimeout(Duration.ofSeconds(0));
factory.setMaxTextMessageSize(1024 * 1024 * 2); factory.setMaxTextMessageSize(1024 * 1024 * 2);
factory.register(ServerEndpoint.class); factory.setCreator((req,resp)->serverEndpoint);
} }
}); });
context.addServlet(holder, "/ws"); context.addServlet(holder, "/ws");
@ -339,8 +342,7 @@ public class ClientCloseTest
public void testWriteException() throws Exception public void testWriteException() throws Exception
{ {
// Set client timeout // Set client timeout
final int timeout = 2000; client.setIdleTimeout(Duration.ZERO);
client.setIdleTimeout(Duration.ofMillis(timeout));
ClientOpenSessionTracker clientSessionTracker = new ClientOpenSessionTracker(1); ClientOpenSessionTracker clientSessionTracker = new ClientOpenSessionTracker(1);
clientSessionTracker.addTo(client); clientSessionTracker.addTo(client);
@ -353,29 +355,40 @@ public class ClientCloseTest
// client confirms connection via echo // client confirms connection via echo
confirmConnection(clientSocket, clientConnectFuture); confirmConnection(clientSocket, clientConnectFuture);
// setup client endpoint for write failure (test only) try
EndPoint endp = clientSocket.getEndPoint(); {
endp.shutdownOutput(); // Block on the server so that the server does not detect a read failure
clientSocket.getSession().getRemote().sendString("block");
// client enqueue close frame // setup client endpoint for write failure (test only)
// should result in a client write failure EndPoint endp = clientSocket.getEndPoint();
final String origCloseReason = "Normal Close from Client"; endp.shutdownOutput();
clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason);
assertThat("OnError Latch", clientSocket.errorLatch.await(2, SECONDS), is(true)); // client enqueue close frame
assertThat("OnError", clientSocket.error.get(), instanceOf(ClosedChannelException.class)); // should result in a client write failure
final String origCloseReason = "Normal Close from Client";
clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason);
// client triggers close event on client ws-endpoint assertThat("OnError Latch", clientSocket.errorLatch.await(2, SECONDS), is(true));
// assert - close code==1006 (abnormal) assertThat("OnError", clientSocket.error.get(), instanceOf(EofException.class));
clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.ABNORMAL), containsString("Channel Closed"));
clientSessionTracker.assertClosedProperly(client); // client triggers close event on client ws-endpoint
// assert - close code==1006 (abnormal)
clientSocket.assertReceivedCloseEvent(2000, is(StatusCode.ABNORMAL), null);
clientSessionTracker.assertClosedProperly(client);
}
finally
{
serverEndpoint.block.countDown();
}
} }
public static class ServerEndpoint implements WebSocketFrameListener, WebSocketListener public static class ServerEndpoint implements WebSocketFrameListener, WebSocketListener
{ {
private static final Logger LOG = Log.getLogger(ServerEndpoint.class); private static final Logger LOG = Log.getLogger(ServerEndpoint.class);
private Session session; private Session session;
CountDownLatch block = new CountDownLatch(1);
@Override @Override
public void onWebSocketBinary(byte[] payload, int offset, int len) public void onWebSocketBinary(byte[] payload, int offset, int len)
@ -395,12 +408,20 @@ public class ClientCloseTest
String bigmsg = new String(buf, UTF_8); String bigmsg = new String(buf, UTF_8);
session.getRemote().sendString(bigmsg); session.getRemote().sendString(bigmsg);
} }
else if (message.equals("block"))
{
assertTrue(block.await(5, TimeUnit.MINUTES));
}
else else
{ {
// simple echo // simple echo
session.getRemote().sendString(message); session.getRemote().sendString(message);
} }
} }
catch (InterruptedException e)
{
throw new IllegalStateException(e);
}
catch (IOException ignore) catch (IOException ignore)
{ {
LOG.debug(ignore); LOG.debug(ignore);
@ -422,9 +443,7 @@ public class ClientCloseTest
public void onWebSocketError(Throwable cause) public void onWebSocketError(Throwable cause)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
{ LOG.debug("onWebSocketError(): ", cause);
LOG.debug(cause);
}
} }
@Override @Override

View File

@ -32,6 +32,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.Utf8Appendable; import org.eclipse.jetty.util.Utf8Appendable;
@ -306,12 +307,18 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
public void onEof() public void onEof()
{ {
if (LOG.isDebugEnabled())
LOG.debug("onEof() {}", this);
if (channelState.onEof()) if (channelState.onEof())
closeConnection(new ClosedChannelException(), channelState.getCloseStatus(), Callback.NOOP); closeConnection(new ClosedChannelException(), channelState.getCloseStatus(), Callback.NOOP);
} }
public void closeConnection(Throwable cause, CloseStatus closeStatus, Callback callback) public void closeConnection(Throwable cause, CloseStatus closeStatus, Callback callback)
{ {
if (LOG.isDebugEnabled())
LOG.debug("closeConnection() {} {} {}", closeStatus, this, cause);
connection.cancelDemand(); connection.cancelDemand();
if (connection.getEndPoint().isOpen()) if (connection.getEndPoint().isOpen())
connection.close(); connection.close();
@ -371,6 +378,8 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
code = CloseStatus.BAD_PAYLOAD; code = CloseStatus.BAD_PAYLOAD;
else if (cause instanceof WebSocketTimeoutException || cause instanceof TimeoutException || cause instanceof SocketTimeoutException) else if (cause instanceof WebSocketTimeoutException || cause instanceof TimeoutException || cause instanceof SocketTimeoutException)
code = CloseStatus.SHUTDOWN; code = CloseStatus.SHUTDOWN;
else if (cause instanceof EofException)
code = CloseStatus.NO_CLOSE;
else if (behavior == Behavior.CLIENT) else if (behavior == Behavior.CLIENT)
code = CloseStatus.POLICY_VIOLATION; code = CloseStatus.POLICY_VIOLATION;
else else

View File

@ -599,8 +599,8 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
@Override @Override
public void onCompleteFailure(Throwable x) public void onCompleteFailure(Throwable x)
{ {
super.onCompleteFailure(x);
channel.processConnectionError(x, NOOP); channel.processConnectionError(x, NOOP);
super.onCompleteFailure(x);
} }
} }
} }