Issue #1599 - WebSocketClient early close scenarios

+ Connection timeout results in:
  endpoint.onError(WebSocketTimeoutException)
  localSession.close(SHUTDOWN)
This commit is contained in:
Joakim Erdfelt 2017-06-13 09:47:44 -07:00
parent c115b4a229
commit bd751b6ce2
6 changed files with 361 additions and 373 deletions

View File

@ -59,6 +59,7 @@ import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WebSocketTimeoutException;
import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
@ -697,6 +698,10 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
close(ce.getStatusCode(), ce.getMessage(), callback);
}
else if (cause instanceof WebSocketTimeoutException)
{
close(StatusCode.SHUTDOWN, cause.getMessage(), onDisconnectCallback);
}
else
{
LOG.warn("Unhandled Error (closing connection)", cause);

View File

@ -42,6 +42,7 @@ import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WebSocketTimeoutException;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.Generator;
@ -221,6 +222,16 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
super.onClose();
}
@Override
public boolean onIdleExpired()
{
if (LOG.isDebugEnabled())
LOG.debug("onIdleExpired()");
notifyError(new WebSocketTimeoutException("Connection Idle Timeout"));
return true;
}
@Override
public boolean onFrame(Frame frame)
{

View File

@ -41,6 +41,7 @@ public abstract class AbstractTrackingEndpoint<T>
public CountDownLatch openLatch = new CountDownLatch(1);
public CountDownLatch closeLatch = new CountDownLatch(1);
public CountDownLatch errorLatch = new CountDownLatch(1);
public AtomicReference<CloseInfo> closeInfo = new AtomicReference<>();
public AtomicReference<Throwable> error = new AtomicReference<>();
@ -89,6 +90,11 @@ public abstract class AbstractTrackingEndpoint<T>
assertTrue(prefix + " onOpen event", openLatch.await(Defaults.OPEN_EVENT_TIMEOUT_MS, TimeUnit.MILLISECONDS));
}
public void awaitErrorEvent(String prefix) throws InterruptedException
{
assertTrue(prefix + " onError event", errorLatch.await(Defaults.CLOSE_EVENT_TIMEOUT_MS, TimeUnit.MILLISECONDS));
}
protected void onWSOpen(T session)
{
this.session = session;
@ -124,5 +130,6 @@ public abstract class AbstractTrackingEndpoint<T>
LOG.warn("onError should only happen once - Extra/Excess Cause", cause);
fail("onError should only happen once!");
}
this.errorLatch.countDown();
}
}

View File

@ -0,0 +1,291 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.tests.client;
import static org.hamcrest.CoreMatchers.anything;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertThat;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.HttpResponseException;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.api.WebSocketTimeoutException;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.eclipse.jetty.websocket.tests.Defaults;
import org.eclipse.jetty.websocket.tests.SimpleServletServer;
import org.eclipse.jetty.websocket.tests.TrackingEndpoint;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TestName;
/**
* Tests various early drop/close scenarios
*/
public class ClientEarlyCloseTest
{
/**
* On Open, close socket
*/
@WebSocket
public static class OpenDropSocket
{
private static final Logger LOG = Log.getLogger(OpenDropSocket.class);
@OnWebSocketConnect
public void onOpen(Session sess)
{
LOG.debug("onOpen({})", sess);
try
{
sess.disconnect();
}
catch (IOException ignore)
{
}
}
}
/**
* On Open, throw unhandled exception
*/
@WebSocket
public static class OpenFailSocket
{
private static final Logger LOG = Log.getLogger(OpenFailSocket.class);
@OnWebSocketConnect
public void onOpen(Session sess)
{
LOG.debug("onOpen({})", sess);
// Test failure due to unhandled exception
// this should trigger a fast-fail closure during open/connect
throw new RuntimeException("Intentional FastFail");
}
}
/**
* On Message, drop connection
*/
public static class MessageDropSocket extends WebSocketAdapter
{
private static final Logger LOG = Log.getLogger(MessageDropSocket.class);
@Override
public void onWebSocketText(String message)
{
LOG.debug("onWebSocketText({})", message);
try
{
getSession().disconnect();
}
catch (IOException ignore)
{
}
}
}
public static class EarlyCloseServlet extends WebSocketServlet implements WebSocketCreator
{
@Override
public void configure(WebSocketServletFactory factory)
{
factory.setCreator(this);
}
@Override
public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp)
{
if (req.hasSubProtocol("opendrop"))
{
resp.setAcceptedSubProtocol("opendrop");
return new OpenDropSocket();
}
if (req.hasSubProtocol("openfail"))
{
resp.setAcceptedSubProtocol("openfail");
return new OpenFailSocket();
}
if (req.hasSubProtocol("msgdrop"))
{
resp.setAcceptedSubProtocol("msgdrop");
return new MessageDropSocket();
}
return null;
}
}
@Rule
public TestName testname = new TestName();
@Rule
public ExpectedException expectedException = ExpectedException.none();
private SimpleServletServer server;
private WebSocketClient client;
@Before
public void startServer() throws Exception
{
server = new SimpleServletServer(new EarlyCloseServlet());
server.start();
}
@After
public void stopServer() throws Exception
{
server.stop();
}
@Before
public void startClient() throws Exception
{
client = new WebSocketClient();
client.start();
}
@After
public void stopClient() throws Exception
{
client.stop();
}
/**
* The remote endpoint sends a close frame immediately.
*
* @throws Exception on test failure
*/
@Test
public void immediateDrop() throws Exception
{
ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest();
upgradeRequest.setSubProtocols("openclose");
TrackingEndpoint clientSocket = new TrackingEndpoint(testname.getMethodName());
URI wsUri = server.getServerUri().resolve("/");
Future<Session> clientConnectFuture = client.connect(clientSocket, wsUri, upgradeRequest);
expectedException.expect(ExecutionException.class);
expectedException.expectCause(instanceOf(HttpResponseException.class));
expectedException.expectMessage(containsString("503 Endpoint Creation Failed"));
clientConnectFuture.get(Defaults.CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
}
/**
* The remote endpoint performed upgrade handshake ok, but failed its onOpen.
*
* @throws Exception on test failure
*/
@Test
public void remoteOpenFailure() throws Exception
{
ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest();
upgradeRequest.setSubProtocols("openfail");
TrackingEndpoint clientSocket = new TrackingEndpoint(testname.getMethodName());
URI wsUri = server.getServerUri().resolve("/");
Future<Session> clientConnectFuture = client.connect(clientSocket, wsUri, upgradeRequest);
Session session = clientConnectFuture.get(Defaults.CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
try
{
clientSocket.openLatch.await(Defaults.OPEN_EVENT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertThat("OnOpen.UpgradeRequest", clientSocket.openUpgradeRequest, notNullValue());
assertThat("OnOpen.UpgradeResponse", clientSocket.openUpgradeResponse, notNullValue());
assertThat("Negotiated SubProtocol", clientSocket.openUpgradeResponse.getAcceptedSubProtocol(), is("openfail"));
clientSocket.awaitCloseEvent("Client");
clientSocket.assertCloseInfo("Client", StatusCode.SERVER_ERROR, anything());
}
finally
{
session.close();
}
}
/**
* The connection has performed handshake successfully.
* <p>
* Send of message to remote results in dropped connection on server side.
* </p>
*
* @throws Exception on test failure
*/
@Test
public void messageDrop() throws Exception
{
ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest();
upgradeRequest.setSubProtocols("msgdrop");
TrackingEndpoint clientSocket = new TrackingEndpoint(testname.getMethodName());
URI wsUri = server.getServerUri().resolve("/");
client.setMaxIdleTimeout(3000);
Future<Session> clientConnectFuture = client.connect(clientSocket, wsUri, upgradeRequest);
Session session = clientConnectFuture.get(Defaults.CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
try
{
clientSocket.openLatch.await(Defaults.OPEN_EVENT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertThat("OnOpen.UpgradeRequest", clientSocket.openUpgradeRequest, notNullValue());
assertThat("OnOpen.UpgradeResponse", clientSocket.openUpgradeResponse, notNullValue());
assertThat("Negotiated SubProtocol", clientSocket.openUpgradeResponse.getAcceptedSubProtocol(), is("msgdrop"));
session.getRemote().sendString("drop-me");
clientSocket.awaitErrorEvent("Client");
clientSocket.assertErrorEvent("Client", instanceOf(WebSocketTimeoutException.class), containsString("Connection Idle Timeout"));
}
finally
{
session.close();
}
}
}

View File

@ -1,328 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.tests.server;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertThat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.api.WebSocketConstants;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.server.WebSocketServerFactory;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.eclipse.jetty.websocket.tests.LocalFuzzer;
import org.eclipse.jetty.websocket.tests.SimpleServletServer;
import org.eclipse.jetty.websocket.tests.UpgradeUtils;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
/**
* Tests various close scenarios
*/
@Ignore("Need to fix")
public class WebSocketCloseTest
{
/**
* On Message, return container information
*/
public static class ContainerSocket extends WebSocketAdapter
{
private static final Logger LOG = Log.getLogger(WebSocketCloseTest.ContainerSocket.class);
private final WebSocketServerFactory container;
private Session session;
public ContainerSocket(WebSocketServerFactory container)
{
this.container = container;
}
@Override
public void onWebSocketText(String message)
{
LOG.debug("onWebSocketText({})", message);
if (message.equalsIgnoreCase("openSessions"))
{
try
{
Collection<WebSocketSession> sessions = container.getOpenSessions();
StringBuilder ret = new StringBuilder();
ret.append("openSessions.size=").append(sessions.size()).append('\n');
int idx = 0;
for (WebSocketSession sess : sessions)
{
ret.append('[').append(idx++).append("] ").append(sess.toString()).append('\n');
}
session.getRemote().sendString(ret.toString());
}
catch (IOException e)
{
LOG.warn(e);
}
}
session.close(StatusCode.NORMAL, "ContainerSocket");
}
@Override
public void onWebSocketConnect(Session sess)
{
LOG.debug("onWebSocketConnect({})", sess);
this.session = sess;
}
}
/**
* On Connect, close socket
*/
public static class FastCloseSocket extends WebSocketAdapter
{
private static final Logger LOG = Log.getLogger(WebSocketCloseTest.FastCloseSocket.class);
@Override
public void onWebSocketConnect(Session sess)
{
LOG.debug("onWebSocketConnect({})", sess);
sess.close(StatusCode.NORMAL, "FastCloseServer");
}
}
/**
* On Connect, throw unhandled exception
*/
public static class FastFailSocket extends WebSocketAdapter
{
private static final Logger LOG = Log.getLogger(WebSocketCloseTest.FastFailSocket.class);
@Override
public void onWebSocketConnect(Session sess)
{
LOG.debug("onWebSocketConnect({})", sess);
// Test failure due to unhandled exception
// this should trigger a fast-fail closure during open/connect
throw new RuntimeException("Intentional FastFail");
}
}
/**
* On Message, drop connection
*/
public static class DropServerConnectionSocket extends WebSocketAdapter
{
@Override
public void onWebSocketText(String message)
{
try
{
getSession().disconnect();
}
catch (IOException ignore)
{
}
}
}
public static class CloseServlet extends WebSocketServlet implements WebSocketCreator
{
private WebSocketServerFactory serverFactory;
@Override
public void configure(WebSocketServletFactory factory)
{
factory.setCreator(this);
if (factory instanceof WebSocketServerFactory)
{
this.serverFactory = (WebSocketServerFactory) factory;
}
}
@Override
public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp)
{
if (req.hasSubProtocol("fastclose"))
{
return new FastCloseSocket();
}
if (req.hasSubProtocol("fastfail"))
{
return new FastFailSocket();
}
if (req.hasSubProtocol("drop"))
{
return new DropServerConnectionSocket();
}
if (req.hasSubProtocol("container"))
{
return new ContainerSocket(serverFactory);
}
return new RFC6455Socket();
}
}
private SimpleServletServer server;
@Before
public void startServer() throws Exception
{
server = new SimpleServletServer(new CloseServlet());
server.start();
}
@After
public void stopServer() throws Exception
{
server.stop();
}
/**
* Test fast close (bug #403817)
*
* @throws Exception on test failure
*/
@Test
public void fastClose() throws Exception
{
Map<String, String> upgradeHeaders = UpgradeUtils.newDefaultUpgradeRequestHeaders();
upgradeHeaders.put(WebSocketConstants.SEC_WEBSOCKET_PROTOCOL, "fastclose");
List<WebSocketFrame> expect = new ArrayList<>();
expect.add(new CloseInfo(StatusCode.NORMAL, "FastCloseServer").asFrame());
try (LocalFuzzer session = server.newLocalFuzzer("/", upgradeHeaders))
{
session.sendFrames(new CloseInfo(StatusCode.NORMAL).asFrame());
session.expect(expect);
}
}
/**
* Test fast fail (bug #410537)
*
* @throws Exception on test failure
*/
@Test
public void fastFail() throws Exception
{
Map<String, String> upgradeHeaders = UpgradeUtils.newDefaultUpgradeRequestHeaders();
upgradeHeaders.put(WebSocketConstants.SEC_WEBSOCKET_PROTOCOL, "fastfail");
List<WebSocketFrame> expect = new ArrayList<>();
expect.add(new CloseInfo(StatusCode.SERVER_ERROR).asFrame());
try (StacklessLogging ignore = new StacklessLogging(FastFailSocket.class);
LocalFuzzer session = server.newLocalFuzzer("/", upgradeHeaders))
{
session.expect(expect);
}
}
@Test
public void dropServerConnection() throws Exception
{
Map<String, String> upgradeHeaders = UpgradeUtils.newDefaultUpgradeRequestHeaders();
upgradeHeaders.put(WebSocketConstants.SEC_WEBSOCKET_PROTOCOL, "drop");
try (LocalFuzzer session = server.newLocalFuzzer("/", upgradeHeaders))
{
session.sendFrames(new TextFrame().setPayload("drop"));
BlockingQueue<WebSocketFrame> framesQueue = session.getOutputFrames();
assertThat("No frames as output", framesQueue.size(), Matchers.is(0));
}
}
/**
*
* @throws Exception on test failure
*/
@Test
public void testFastFailFastClose() throws Exception
{
fastFail();
fastClose();
}
/**
* Test session open session cleanup (bug #474936)
*
* @throws Exception on test failure
*/
@Test
public void testOpenSessionCleanup() throws Exception
{
fastFail();
fastClose();
dropClientConnection();
Map<String, String> upgradeHeaders = UpgradeUtils.newDefaultUpgradeRequestHeaders();
upgradeHeaders.put(WebSocketConstants.SEC_WEBSOCKET_PROTOCOL, "container");
try (LocalFuzzer session = server.newLocalFuzzer("/?openSessions", upgradeHeaders))
{
session.sendFrames(
new TextFrame().setPayload("openSessions"),
new CloseInfo(StatusCode.NORMAL).asFrame()
);
BlockingQueue<WebSocketFrame> framesQueue = session.getOutputFrames();
WebSocketFrame frame = framesQueue.poll(3, TimeUnit.SECONDS);
assertThat("Frame.opCode", frame.getOpCode(), is(OpCode.TEXT));
assertThat("Frame.text-payload", frame.getPayloadAsUTF8(), containsString("openSessions.size=1\n"));
}
}
private void dropClientConnection() throws Exception
{
Map<String, String> upgradeHeaders = UpgradeUtils.newDefaultUpgradeRequestHeaders();
upgradeHeaders.put(WebSocketConstants.SEC_WEBSOCKET_PROTOCOL, "container");
try (LocalFuzzer ignored = server.newLocalFuzzer("/", upgradeHeaders))
{
// do nothing, just let endpoint close
}
}
}

View File

@ -26,6 +26,8 @@ org.eclipse.jetty.LEVEL=WARN
# org.eclipse.jetty.io.WriteFlusher.LEVEL=DEBUG
# org.eclipse.jetty.io.FillInterest.LEVEL=DEBUG
# org.eclipse.jetty.client.LEVEL=DEBUG
# org.eclipse.jetty.io.LEVEL=DEBUG
# org.eclipse.jetty.io.ManagedSelector.LEVEL=INFO
# org.eclipse.jetty.websocket.LEVEL=DEBUG
# org.eclipse.jetty.websocket.LEVEL=INFO
# org.eclipse.jetty.websocket.jsr356.messages.LEVEL=DEBUG