Issue #1599 - WebSocketClient early close scenarios

+ Connection timeout results in:
  endpoint.onError(WebSocketTimeoutException)
  localSession.close(SHUTDOWN)
This commit is contained in:
Joakim Erdfelt 2017-06-13 09:47:44 -07:00
parent 1ac16dd19b
commit 61fc95aa36
6 changed files with 361 additions and 373 deletions

View File

@ -59,6 +59,7 @@ import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WebSocketTimeoutException;
import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
@ -697,6 +698,10 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
close(ce.getStatusCode(), ce.getMessage(), callback);
}
else if (cause instanceof WebSocketTimeoutException)
{
close(StatusCode.SHUTDOWN, cause.getMessage(), onDisconnectCallback);
}
else
{
LOG.warn("Unhandled Error (closing connection)", cause);

View File

@ -42,6 +42,7 @@ import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WebSocketTimeoutException;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.Generator;
@ -72,7 +73,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
* Minimum size of a buffer is the determined to be what would be the maximum framing header size (not including payload)
*/
private static final int MIN_BUFFER_SIZE = Generator.MAX_HEADER_LENGTH;
private final Logger LOG;
private final ByteBufferPool bufferPool;
private final Generator generator;
@ -86,18 +87,18 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
private final List<LogicalConnection.Listener> listeners = new CopyOnWriteArrayList<>();
private List<ExtensionConfig> extensions;
private ByteBuffer networkBuffer;
public AbstractWebSocketConnection(EndPoint endp, Executor executor, WebSocketPolicy policy, ByteBufferPool bufferPool, ExtensionStack extensionStack)
{
super(endp,executor);
Objects.requireNonNull(endp, "EndPoint");
Objects.requireNonNull(executor, "Executor");
Objects.requireNonNull(policy, "WebSocketPolicy");
Objects.requireNonNull(bufferPool, "ByteBufferPool");
LOG = Log.getLogger(AbstractWebSocketConnection.class.getName() + "." + policy.getBehavior());
this.id = String.format("%s:%d->%s:%d",
endp.getLocalAddress().getAddress().getHostAddress(),
endp.getLocalAddress().getPort(),
@ -106,7 +107,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
this.policy = policy;
this.bufferPool = bufferPool;
this.extensionStack = extensionStack;
this.generator = new Generator(policy,bufferPool);
this.parser = new Parser(policy,bufferPool,this);
this.extensions = new ArrayList<>();
@ -114,12 +115,12 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
this.flusher = new Flusher(policy.getOutputBufferSize(),generator,endp);
this.setInputBufferSize(policy.getInputBufferSize());
this.setMaxIdleTimeout(policy.getIdleTimeout());
this.extensionStack.setPolicy(this.policy);
this.extensionStack.configure(this.parser);
this.extensionStack.configure(this.generator);
}
@Override
public Executor getExecutor()
{
@ -131,14 +132,14 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{
if (LOG.isDebugEnabled())
LOG.debug("disconnect()");
// close FrameFlusher, we cannot write anymore at this point.
flusher.close();
closed.set(true);
close();
}
@Override
public ByteBufferPool getBufferPool()
{
@ -184,12 +185,12 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{
return parser;
}
public WebSocketPolicy getPolicy()
{
return policy;
}
@Override
public InetSocketAddress getRemoteAddress()
{
@ -214,21 +215,31 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{
if (LOG.isDebugEnabled())
LOG.debug("onClose()");
closed.set(true);
flusher.close();
super.onClose();
}
@Override
public boolean onIdleExpired()
{
if (LOG.isDebugEnabled())
LOG.debug("onIdleExpired()");
notifyError(new WebSocketTimeoutException("Connection Idle Timeout"));
return true;
}
@Override
public boolean onFrame(Frame frame)
{
AtomicBoolean result = new AtomicBoolean(false);
if(LOG.isDebugEnabled())
LOG.debug("onFrame({})", frame);
extensionStack.incomingFrame(frame, new FrameCallback()
{
@Override
@ -236,7 +247,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{
if(LOG.isDebugEnabled())
LOG.debug("onFrame({}).succeed()", frame);
parser.release(frame);
if(!result.compareAndSet(false,true))
{
@ -244,28 +255,28 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
fillAndParse();
}
}
@Override
public void fail(Throwable cause)
{
if(LOG.isDebugEnabled())
LOG.debug("onFrame("+ frame + ").fail()", cause);
parser.release(frame);
// notify session & endpoint
notifyError(cause);
}
});
if(result.compareAndSet(false, true))
{
// callback hasn't been notified yet
return false;
}
return true;
}
private ByteBuffer getNetworkBuffer()
{
synchronized (this)
@ -277,7 +288,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return networkBuffer;
}
}
private void releaseNetworkBuffer(ByteBuffer buffer)
{
synchronized (this)
@ -287,14 +298,14 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
networkBuffer = null;
}
}
@Override
public void onFillable()
{
getNetworkBuffer();
fillAndParse();
}
private void fillAndParse()
{
try
@ -305,25 +316,25 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{
return;
}
ByteBuffer nBuffer = getNetworkBuffer();
if (!parser.parse(nBuffer)) return;
// Shouldn't reach this point if buffer has un-parsed bytes
assert(!nBuffer.hasRemaining());
int filled = getEndPoint().fill(nBuffer);
if(LOG.isDebugEnabled())
LOG.debug("endpointFill() filled={}: {}", filled, BufferUtil.toDetailString(nBuffer));
if (filled < 0)
{
releaseNetworkBuffer(nBuffer);
return;
}
if (filled == 0)
{
releaseNetworkBuffer(nBuffer);
@ -337,8 +348,8 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
notifyError(t);
}
}
/**
* Extra bytes from the initial HTTP upgrade that need to
* be processed by the websocket parser before starting
@ -351,7 +362,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{
LOG.debug("set Initial Buffer - {}",BufferUtil.toDetailString(prefilled));
}
if ((prefilled != null) && (prefilled.hasRemaining()))
{
networkBuffer = bufferPool.acquire(prefilled.remaining(), true);
@ -367,7 +378,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{
LOG.warn("Unhandled Connection Error", cause);
}
for (LogicalConnection.Listener listener : listeners)
{
try
@ -381,7 +392,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
}
}
/**
* Physical connection Open.
*/
@ -416,7 +427,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
flusher.enqueue(frame,callback,batchMode);
}
@Override
public void resume()
{
@ -425,19 +436,19 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
fillAndParse();
}
}
public boolean addListener(LogicalConnection.Listener listener)
{
super.addListener(listener);
return this.listeners.add(listener);
}
public boolean removeListener(LogicalConnection.Listener listener)
{
super.removeListener(listener);
return this.listeners.remove(listener);
}
/**
* Get the list of extensions in use.
* <p>
@ -469,7 +480,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
getEndPoint().setIdleTimeout(ms);
}
}
@Override
public SuspendToken suspend()
{
@ -499,7 +510,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
generator,
parser);
}
@Override
public int hashCode()
{
@ -549,7 +560,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{
LOG.debug("onUpgradeTo({})", BufferUtil.toDetailString(prefilled));
}
setInitialBuffer(prefilled);
}
}

View File

@ -41,6 +41,7 @@ public abstract class AbstractTrackingEndpoint<T>
public CountDownLatch openLatch = new CountDownLatch(1);
public CountDownLatch closeLatch = new CountDownLatch(1);
public CountDownLatch errorLatch = new CountDownLatch(1);
public AtomicReference<CloseInfo> closeInfo = new AtomicReference<>();
public AtomicReference<Throwable> error = new AtomicReference<>();
@ -88,6 +89,11 @@ public abstract class AbstractTrackingEndpoint<T>
{
assertTrue(prefix + " onOpen event", openLatch.await(Defaults.OPEN_EVENT_TIMEOUT_MS, TimeUnit.MILLISECONDS));
}
public void awaitErrorEvent(String prefix) throws InterruptedException
{
assertTrue(prefix + " onError event", errorLatch.await(Defaults.CLOSE_EVENT_TIMEOUT_MS, TimeUnit.MILLISECONDS));
}
protected void onWSOpen(T session)
{
@ -124,5 +130,6 @@ public abstract class AbstractTrackingEndpoint<T>
LOG.warn("onError should only happen once - Extra/Excess Cause", cause);
fail("onError should only happen once!");
}
this.errorLatch.countDown();
}
}

View File

@ -0,0 +1,291 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.tests.client;
import static org.hamcrest.CoreMatchers.anything;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertThat;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.HttpResponseException;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.api.WebSocketTimeoutException;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.eclipse.jetty.websocket.tests.Defaults;
import org.eclipse.jetty.websocket.tests.SimpleServletServer;
import org.eclipse.jetty.websocket.tests.TrackingEndpoint;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TestName;
/**
* Tests various early drop/close scenarios
*/
public class ClientEarlyCloseTest
{
/**
* On Open, close socket
*/
@WebSocket
public static class OpenDropSocket
{
private static final Logger LOG = Log.getLogger(OpenDropSocket.class);
@OnWebSocketConnect
public void onOpen(Session sess)
{
LOG.debug("onOpen({})", sess);
try
{
sess.disconnect();
}
catch (IOException ignore)
{
}
}
}
/**
* On Open, throw unhandled exception
*/
@WebSocket
public static class OpenFailSocket
{
private static final Logger LOG = Log.getLogger(OpenFailSocket.class);
@OnWebSocketConnect
public void onOpen(Session sess)
{
LOG.debug("onOpen({})", sess);
// Test failure due to unhandled exception
// this should trigger a fast-fail closure during open/connect
throw new RuntimeException("Intentional FastFail");
}
}
/**
* On Message, drop connection
*/
public static class MessageDropSocket extends WebSocketAdapter
{
private static final Logger LOG = Log.getLogger(MessageDropSocket.class);
@Override
public void onWebSocketText(String message)
{
LOG.debug("onWebSocketText({})", message);
try
{
getSession().disconnect();
}
catch (IOException ignore)
{
}
}
}
public static class EarlyCloseServlet extends WebSocketServlet implements WebSocketCreator
{
@Override
public void configure(WebSocketServletFactory factory)
{
factory.setCreator(this);
}
@Override
public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp)
{
if (req.hasSubProtocol("opendrop"))
{
resp.setAcceptedSubProtocol("opendrop");
return new OpenDropSocket();
}
if (req.hasSubProtocol("openfail"))
{
resp.setAcceptedSubProtocol("openfail");
return new OpenFailSocket();
}
if (req.hasSubProtocol("msgdrop"))
{
resp.setAcceptedSubProtocol("msgdrop");
return new MessageDropSocket();
}
return null;
}
}
@Rule
public TestName testname = new TestName();
@Rule
public ExpectedException expectedException = ExpectedException.none();
private SimpleServletServer server;
private WebSocketClient client;
@Before
public void startServer() throws Exception
{
server = new SimpleServletServer(new EarlyCloseServlet());
server.start();
}
@After
public void stopServer() throws Exception
{
server.stop();
}
@Before
public void startClient() throws Exception
{
client = new WebSocketClient();
client.start();
}
@After
public void stopClient() throws Exception
{
client.stop();
}
/**
* The remote endpoint sends a close frame immediately.
*
* @throws Exception on test failure
*/
@Test
public void immediateDrop() throws Exception
{
ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest();
upgradeRequest.setSubProtocols("openclose");
TrackingEndpoint clientSocket = new TrackingEndpoint(testname.getMethodName());
URI wsUri = server.getServerUri().resolve("/");
Future<Session> clientConnectFuture = client.connect(clientSocket, wsUri, upgradeRequest);
expectedException.expect(ExecutionException.class);
expectedException.expectCause(instanceOf(HttpResponseException.class));
expectedException.expectMessage(containsString("503 Endpoint Creation Failed"));
clientConnectFuture.get(Defaults.CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
}
/**
* The remote endpoint performed upgrade handshake ok, but failed its onOpen.
*
* @throws Exception on test failure
*/
@Test
public void remoteOpenFailure() throws Exception
{
ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest();
upgradeRequest.setSubProtocols("openfail");
TrackingEndpoint clientSocket = new TrackingEndpoint(testname.getMethodName());
URI wsUri = server.getServerUri().resolve("/");
Future<Session> clientConnectFuture = client.connect(clientSocket, wsUri, upgradeRequest);
Session session = clientConnectFuture.get(Defaults.CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
try
{
clientSocket.openLatch.await(Defaults.OPEN_EVENT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertThat("OnOpen.UpgradeRequest", clientSocket.openUpgradeRequest, notNullValue());
assertThat("OnOpen.UpgradeResponse", clientSocket.openUpgradeResponse, notNullValue());
assertThat("Negotiated SubProtocol", clientSocket.openUpgradeResponse.getAcceptedSubProtocol(), is("openfail"));
clientSocket.awaitCloseEvent("Client");
clientSocket.assertCloseInfo("Client", StatusCode.SERVER_ERROR, anything());
}
finally
{
session.close();
}
}
/**
* The connection has performed handshake successfully.
* <p>
* Send of message to remote results in dropped connection on server side.
* </p>
*
* @throws Exception on test failure
*/
@Test
public void messageDrop() throws Exception
{
ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest();
upgradeRequest.setSubProtocols("msgdrop");
TrackingEndpoint clientSocket = new TrackingEndpoint(testname.getMethodName());
URI wsUri = server.getServerUri().resolve("/");
client.setMaxIdleTimeout(3000);
Future<Session> clientConnectFuture = client.connect(clientSocket, wsUri, upgradeRequest);
Session session = clientConnectFuture.get(Defaults.CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
try
{
clientSocket.openLatch.await(Defaults.OPEN_EVENT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertThat("OnOpen.UpgradeRequest", clientSocket.openUpgradeRequest, notNullValue());
assertThat("OnOpen.UpgradeResponse", clientSocket.openUpgradeResponse, notNullValue());
assertThat("Negotiated SubProtocol", clientSocket.openUpgradeResponse.getAcceptedSubProtocol(), is("msgdrop"));
session.getRemote().sendString("drop-me");
clientSocket.awaitErrorEvent("Client");
clientSocket.assertErrorEvent("Client", instanceOf(WebSocketTimeoutException.class), containsString("Connection Idle Timeout"));
}
finally
{
session.close();
}
}
}

View File

@ -1,328 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.tests.server;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertThat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.api.WebSocketConstants;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.server.WebSocketServerFactory;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.eclipse.jetty.websocket.tests.LocalFuzzer;
import org.eclipse.jetty.websocket.tests.SimpleServletServer;
import org.eclipse.jetty.websocket.tests.UpgradeUtils;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
/**
* Tests various close scenarios
*/
@Ignore("Need to fix")
public class WebSocketCloseTest
{
/**
* On Message, return container information
*/
public static class ContainerSocket extends WebSocketAdapter
{
private static final Logger LOG = Log.getLogger(WebSocketCloseTest.ContainerSocket.class);
private final WebSocketServerFactory container;
private Session session;
public ContainerSocket(WebSocketServerFactory container)
{
this.container = container;
}
@Override
public void onWebSocketText(String message)
{
LOG.debug("onWebSocketText({})", message);
if (message.equalsIgnoreCase("openSessions"))
{
try
{
Collection<WebSocketSession> sessions = container.getOpenSessions();
StringBuilder ret = new StringBuilder();
ret.append("openSessions.size=").append(sessions.size()).append('\n');
int idx = 0;
for (WebSocketSession sess : sessions)
{
ret.append('[').append(idx++).append("] ").append(sess.toString()).append('\n');
}
session.getRemote().sendString(ret.toString());
}
catch (IOException e)
{
LOG.warn(e);
}
}
session.close(StatusCode.NORMAL, "ContainerSocket");
}
@Override
public void onWebSocketConnect(Session sess)
{
LOG.debug("onWebSocketConnect({})", sess);
this.session = sess;
}
}
/**
* On Connect, close socket
*/
public static class FastCloseSocket extends WebSocketAdapter
{
private static final Logger LOG = Log.getLogger(WebSocketCloseTest.FastCloseSocket.class);
@Override
public void onWebSocketConnect(Session sess)
{
LOG.debug("onWebSocketConnect({})", sess);
sess.close(StatusCode.NORMAL, "FastCloseServer");
}
}
/**
* On Connect, throw unhandled exception
*/
public static class FastFailSocket extends WebSocketAdapter
{
private static final Logger LOG = Log.getLogger(WebSocketCloseTest.FastFailSocket.class);
@Override
public void onWebSocketConnect(Session sess)
{
LOG.debug("onWebSocketConnect({})", sess);
// Test failure due to unhandled exception
// this should trigger a fast-fail closure during open/connect
throw new RuntimeException("Intentional FastFail");
}
}
/**
* On Message, drop connection
*/
public static class DropServerConnectionSocket extends WebSocketAdapter
{
@Override
public void onWebSocketText(String message)
{
try
{
getSession().disconnect();
}
catch (IOException ignore)
{
}
}
}
public static class CloseServlet extends WebSocketServlet implements WebSocketCreator
{
private WebSocketServerFactory serverFactory;
@Override
public void configure(WebSocketServletFactory factory)
{
factory.setCreator(this);
if (factory instanceof WebSocketServerFactory)
{
this.serverFactory = (WebSocketServerFactory) factory;
}
}
@Override
public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp)
{
if (req.hasSubProtocol("fastclose"))
{
return new FastCloseSocket();
}
if (req.hasSubProtocol("fastfail"))
{
return new FastFailSocket();
}
if (req.hasSubProtocol("drop"))
{
return new DropServerConnectionSocket();
}
if (req.hasSubProtocol("container"))
{
return new ContainerSocket(serverFactory);
}
return new RFC6455Socket();
}
}
private SimpleServletServer server;
@Before
public void startServer() throws Exception
{
server = new SimpleServletServer(new CloseServlet());
server.start();
}
@After
public void stopServer() throws Exception
{
server.stop();
}
/**
* Test fast close (bug #403817)
*
* @throws Exception on test failure
*/
@Test
public void fastClose() throws Exception
{
Map<String, String> upgradeHeaders = UpgradeUtils.newDefaultUpgradeRequestHeaders();
upgradeHeaders.put(WebSocketConstants.SEC_WEBSOCKET_PROTOCOL, "fastclose");
List<WebSocketFrame> expect = new ArrayList<>();
expect.add(new CloseInfo(StatusCode.NORMAL, "FastCloseServer").asFrame());
try (LocalFuzzer session = server.newLocalFuzzer("/", upgradeHeaders))
{
session.sendFrames(new CloseInfo(StatusCode.NORMAL).asFrame());
session.expect(expect);
}
}
/**
* Test fast fail (bug #410537)
*
* @throws Exception on test failure
*/
@Test
public void fastFail() throws Exception
{
Map<String, String> upgradeHeaders = UpgradeUtils.newDefaultUpgradeRequestHeaders();
upgradeHeaders.put(WebSocketConstants.SEC_WEBSOCKET_PROTOCOL, "fastfail");
List<WebSocketFrame> expect = new ArrayList<>();
expect.add(new CloseInfo(StatusCode.SERVER_ERROR).asFrame());
try (StacklessLogging ignore = new StacklessLogging(FastFailSocket.class);
LocalFuzzer session = server.newLocalFuzzer("/", upgradeHeaders))
{
session.expect(expect);
}
}
@Test
public void dropServerConnection() throws Exception
{
Map<String, String> upgradeHeaders = UpgradeUtils.newDefaultUpgradeRequestHeaders();
upgradeHeaders.put(WebSocketConstants.SEC_WEBSOCKET_PROTOCOL, "drop");
try (LocalFuzzer session = server.newLocalFuzzer("/", upgradeHeaders))
{
session.sendFrames(new TextFrame().setPayload("drop"));
BlockingQueue<WebSocketFrame> framesQueue = session.getOutputFrames();
assertThat("No frames as output", framesQueue.size(), Matchers.is(0));
}
}
/**
*
* @throws Exception on test failure
*/
@Test
public void testFastFailFastClose() throws Exception
{
fastFail();
fastClose();
}
/**
* Test session open session cleanup (bug #474936)
*
* @throws Exception on test failure
*/
@Test
public void testOpenSessionCleanup() throws Exception
{
fastFail();
fastClose();
dropClientConnection();
Map<String, String> upgradeHeaders = UpgradeUtils.newDefaultUpgradeRequestHeaders();
upgradeHeaders.put(WebSocketConstants.SEC_WEBSOCKET_PROTOCOL, "container");
try (LocalFuzzer session = server.newLocalFuzzer("/?openSessions", upgradeHeaders))
{
session.sendFrames(
new TextFrame().setPayload("openSessions"),
new CloseInfo(StatusCode.NORMAL).asFrame()
);
BlockingQueue<WebSocketFrame> framesQueue = session.getOutputFrames();
WebSocketFrame frame = framesQueue.poll(3, TimeUnit.SECONDS);
assertThat("Frame.opCode", frame.getOpCode(), is(OpCode.TEXT));
assertThat("Frame.text-payload", frame.getPayloadAsUTF8(), containsString("openSessions.size=1\n"));
}
}
private void dropClientConnection() throws Exception
{
Map<String, String> upgradeHeaders = UpgradeUtils.newDefaultUpgradeRequestHeaders();
upgradeHeaders.put(WebSocketConstants.SEC_WEBSOCKET_PROTOCOL, "container");
try (LocalFuzzer ignored = server.newLocalFuzzer("/", upgradeHeaders))
{
// do nothing, just let endpoint close
}
}
}

View File

@ -26,6 +26,8 @@ org.eclipse.jetty.LEVEL=WARN
# org.eclipse.jetty.io.WriteFlusher.LEVEL=DEBUG
# org.eclipse.jetty.io.FillInterest.LEVEL=DEBUG
# org.eclipse.jetty.client.LEVEL=DEBUG
# org.eclipse.jetty.io.LEVEL=DEBUG
# org.eclipse.jetty.io.ManagedSelector.LEVEL=INFO
# org.eclipse.jetty.websocket.LEVEL=DEBUG
# org.eclipse.jetty.websocket.LEVEL=INFO
# org.eclipse.jetty.websocket.jsr356.messages.LEVEL=DEBUG