474936 - WebSocketSessions are not always cleaned out from openSessions
+ Adding additional testcase to verify behavior
This commit is contained in:
parent
da3699dc55
commit
3901fcbb60
|
@ -0,0 +1,366 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.server;
|
||||
|
||||
import static org.hamcrest.Matchers.*;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.eclipse.jetty.toolchain.test.EventQueue;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.log.StacklessLogging;
|
||||
import org.eclipse.jetty.util.log.StdErrLog;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.api.StatusCode;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
|
||||
import org.eclipse.jetty.websocket.common.CloseInfo;
|
||||
import org.eclipse.jetty.websocket.common.OpCode;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketFrame;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketSession;
|
||||
import org.eclipse.jetty.websocket.common.frames.TextFrame;
|
||||
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
|
||||
import org.eclipse.jetty.websocket.server.helper.RFCSocket;
|
||||
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
|
||||
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
|
||||
import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
|
||||
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
|
||||
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Tests various close scenarios that should result in Open Session cleanup
|
||||
*/
|
||||
public class ManyConnectionsCleanupTest
|
||||
{
|
||||
static class AbstractCloseSocket extends WebSocketAdapter
|
||||
{
|
||||
public CountDownLatch closeLatch = new CountDownLatch(1);
|
||||
public String closeReason = null;
|
||||
public int closeStatusCode = -1;
|
||||
public List<Throwable> errors = new ArrayList<>();
|
||||
|
||||
@Override
|
||||
public void onWebSocketClose(int statusCode, String reason)
|
||||
{
|
||||
LOG.debug("onWebSocketClose({}, {})",statusCode,reason);
|
||||
this.closeStatusCode = statusCode;
|
||||
this.closeReason = reason;
|
||||
closeLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketError(Throwable cause)
|
||||
{
|
||||
errors.add(cause);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("serial")
|
||||
public static class CloseServlet extends WebSocketServlet implements WebSocketCreator
|
||||
{
|
||||
private WebSocketServerFactory serverFactory;
|
||||
private AtomicInteger calls = new AtomicInteger(0);
|
||||
|
||||
@Override
|
||||
public void configure(WebSocketServletFactory factory)
|
||||
{
|
||||
factory.setCreator(this);
|
||||
if (factory instanceof WebSocketServerFactory)
|
||||
{
|
||||
this.serverFactory = (WebSocketServerFactory)factory;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp)
|
||||
{
|
||||
if (req.hasSubProtocol("fastclose"))
|
||||
{
|
||||
closeSocket = new FastCloseSocket(calls);
|
||||
return closeSocket;
|
||||
}
|
||||
|
||||
if (req.hasSubProtocol("fastfail"))
|
||||
{
|
||||
closeSocket = new FastFailSocket(calls);
|
||||
return closeSocket;
|
||||
}
|
||||
|
||||
if (req.hasSubProtocol("container"))
|
||||
{
|
||||
closeSocket = new ContainerSocket(serverFactory,calls);
|
||||
return closeSocket;
|
||||
}
|
||||
return new RFCSocket();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* On Message, return container information
|
||||
*/
|
||||
public static class ContainerSocket extends AbstractCloseSocket
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(ManyConnectionsCleanupTest.ContainerSocket.class);
|
||||
private final WebSocketServerFactory container;
|
||||
private final AtomicInteger calls;
|
||||
private Session session;
|
||||
|
||||
public ContainerSocket(WebSocketServerFactory container, AtomicInteger calls)
|
||||
{
|
||||
this.container = container;
|
||||
this.calls = calls;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketText(String message)
|
||||
{
|
||||
LOG.debug("onWebSocketText({})",message);
|
||||
calls.incrementAndGet();
|
||||
if (message.equalsIgnoreCase("openSessions"))
|
||||
{
|
||||
Set<WebSocketSession> sessions = container.getOpenSessions();
|
||||
|
||||
StringBuilder ret = new StringBuilder();
|
||||
ret.append("openSessions.size=").append(sessions.size()).append('\n');
|
||||
int idx = 0;
|
||||
for (WebSocketSession sess : sessions)
|
||||
{
|
||||
ret.append('[').append(idx++).append("] ").append(sess.toString()).append('\n');
|
||||
}
|
||||
session.getRemote().sendStringByFuture(ret.toString());
|
||||
session.close(StatusCode.NORMAL,"ContainerSocket");
|
||||
} else if(message.equalsIgnoreCase("calls"))
|
||||
{
|
||||
session.getRemote().sendStringByFuture(String.format("calls=%,d",calls.get()));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketConnect(Session sess)
|
||||
{
|
||||
LOG.debug("onWebSocketConnect({})",sess);
|
||||
this.session = sess;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* On Connect, close socket
|
||||
*/
|
||||
public static class FastCloseSocket extends AbstractCloseSocket
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(ManyConnectionsCleanupTest.FastCloseSocket.class);
|
||||
private final AtomicInteger calls;
|
||||
|
||||
public FastCloseSocket(AtomicInteger calls)
|
||||
{
|
||||
this.calls = calls;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketConnect(Session sess)
|
||||
{
|
||||
LOG.debug("onWebSocketConnect({})",sess);
|
||||
calls.incrementAndGet();
|
||||
sess.close(StatusCode.NORMAL,"FastCloseServer");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* On Connect, throw unhandled exception
|
||||
*/
|
||||
public static class FastFailSocket extends AbstractCloseSocket
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(ManyConnectionsCleanupTest.FastFailSocket.class);
|
||||
private final AtomicInteger calls;
|
||||
|
||||
public FastFailSocket(AtomicInteger calls)
|
||||
{
|
||||
this.calls = calls;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketConnect(Session sess)
|
||||
{
|
||||
LOG.debug("onWebSocketConnect({})",sess);
|
||||
calls.incrementAndGet();
|
||||
// Test failure due to unhandled exception
|
||||
// this should trigger a fast-fail closure during open/connect
|
||||
throw new RuntimeException("Intentional FastFail");
|
||||
}
|
||||
}
|
||||
|
||||
private static final Logger LOG = Log.getLogger(ManyConnectionsCleanupTest.class);
|
||||
|
||||
private static SimpleServletServer server;
|
||||
private static AbstractCloseSocket closeSocket;
|
||||
|
||||
@BeforeClass
|
||||
public static void startServer() throws Exception
|
||||
{
|
||||
server = new SimpleServletServer(new CloseServlet());
|
||||
server.start();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void stopServer()
|
||||
{
|
||||
server.stop();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test session open session cleanup (bug #474936)
|
||||
*
|
||||
* @throws Exception
|
||||
* on test failure
|
||||
*/
|
||||
@Test
|
||||
public void testOpenSessionCleanup() throws Exception
|
||||
{
|
||||
int iterationCount = 100;
|
||||
|
||||
StdErrLog.getLogger(FastFailSocket.class).setLevel(StdErrLog.LEVEL_OFF);
|
||||
|
||||
StdErrLog sessLog = StdErrLog.getLogger(WebSocketSession.class);
|
||||
int oldLevel = sessLog.getLevel();
|
||||
sessLog.setLevel(StdErrLog.LEVEL_OFF);
|
||||
|
||||
for (int requests = 0; requests < iterationCount; requests++)
|
||||
{
|
||||
fastFail();
|
||||
fastClose();
|
||||
dropConnection();
|
||||
}
|
||||
|
||||
sessLog.setLevel(oldLevel);
|
||||
|
||||
try (BlockheadClient client = new BlockheadClient(server.getServerUri()))
|
||||
{
|
||||
client.setProtocols("container");
|
||||
client.setTimeout(1,TimeUnit.SECONDS);
|
||||
client.connect();
|
||||
client.sendStandardRequest();
|
||||
client.expectUpgradeResponse();
|
||||
|
||||
client.write(new TextFrame().setPayload("calls"));
|
||||
client.write(new TextFrame().setPayload("openSessions"));
|
||||
|
||||
EventQueue<WebSocketFrame> frames = client.readFrames(3,6,TimeUnit.SECONDS);
|
||||
WebSocketFrame frame;
|
||||
String resp;
|
||||
|
||||
frame = frames.poll();
|
||||
assertThat("frames[0].opcode",frame.getOpCode(),is(OpCode.TEXT));
|
||||
resp = frame.getPayloadAsUTF8();
|
||||
assertThat("Should only have 1 open session",resp,containsString("calls=" + ((iterationCount * 2) + 1)));
|
||||
|
||||
frame = frames.poll();
|
||||
assertThat("frames[1].opcode",frame.getOpCode(),is(OpCode.TEXT));
|
||||
resp = frame.getPayloadAsUTF8();
|
||||
assertThat("Should only have 1 open session",resp,containsString("openSessions.size=1\n"));
|
||||
|
||||
frame = frames.poll();
|
||||
assertThat("frames[2].opcode",frame.getOpCode(),is(OpCode.CLOSE));
|
||||
CloseInfo close = new CloseInfo(frame);
|
||||
assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.NORMAL));
|
||||
client.write(close.asFrame()); // respond with close
|
||||
|
||||
// ensure server socket got close event
|
||||
assertThat("Open Sessions Latch",closeSocket.closeLatch.await(1,TimeUnit.SECONDS),is(true));
|
||||
assertThat("Open Sessions.statusCode",closeSocket.closeStatusCode,is(StatusCode.NORMAL));
|
||||
assertThat("Open Sessions.errors",closeSocket.errors.size(),is(0));
|
||||
}
|
||||
}
|
||||
|
||||
private void fastClose() throws Exception
|
||||
{
|
||||
try (BlockheadClient client = new BlockheadClient(server.getServerUri()))
|
||||
{
|
||||
client.setProtocols("fastclose");
|
||||
client.setTimeout(1,TimeUnit.SECONDS);
|
||||
try (StacklessLogging scope = new StacklessLogging(WebSocketSession.class))
|
||||
{
|
||||
client.connect();
|
||||
client.sendStandardRequest();
|
||||
client.expectUpgradeResponse();
|
||||
|
||||
client.readFrames(1,1,TimeUnit.SECONDS);
|
||||
|
||||
CloseInfo close = new CloseInfo(StatusCode.NORMAL,"Normal");
|
||||
assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.NORMAL));
|
||||
|
||||
// Notify server of close handshake
|
||||
client.write(close.asFrame()); // respond with close
|
||||
|
||||
// ensure server socket got close event
|
||||
assertThat("Fast Close Latch",closeSocket.closeLatch.await(1,TimeUnit.SECONDS),is(true));
|
||||
assertThat("Fast Close.statusCode",closeSocket.closeStatusCode,is(StatusCode.NORMAL));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void fastFail() throws Exception
|
||||
{
|
||||
try (BlockheadClient client = new BlockheadClient(server.getServerUri()))
|
||||
{
|
||||
client.setProtocols("fastfail");
|
||||
client.setTimeout(1,TimeUnit.SECONDS);
|
||||
try (StacklessLogging scope = new StacklessLogging(WebSocketSession.class))
|
||||
{
|
||||
client.connect();
|
||||
client.sendStandardRequest();
|
||||
client.expectUpgradeResponse();
|
||||
|
||||
client.readFrames(1,1,TimeUnit.SECONDS);
|
||||
|
||||
CloseInfo close = new CloseInfo(StatusCode.NORMAL,"Normal");
|
||||
client.write(close.asFrame()); // respond with close
|
||||
|
||||
// ensure server socket got close event
|
||||
assertThat("Fast Fail Latch",closeSocket.closeLatch.await(1,TimeUnit.SECONDS),is(true));
|
||||
assertThat("Fast Fail.statusCode",closeSocket.closeStatusCode,is(StatusCode.SERVER_ERROR));
|
||||
assertThat("Fast Fail.errors",closeSocket.errors.size(),is(1));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void dropConnection() throws Exception
|
||||
{
|
||||
try (BlockheadClient client = new BlockheadClient(server.getServerUri()))
|
||||
{
|
||||
client.setProtocols("container");
|
||||
client.setTimeout(1,TimeUnit.SECONDS);
|
||||
try (StacklessLogging scope = new StacklessLogging(WebSocketSession.class))
|
||||
{
|
||||
client.connect();
|
||||
client.sendStandardRequest();
|
||||
client.expectUpgradeResponse();
|
||||
client.disconnect();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -50,7 +50,6 @@ import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
|
|||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.Ignore;
|
||||
|
||||
/**
|
||||
* Tests various close scenarios
|
||||
|
@ -218,7 +217,6 @@ public class WebSocketCloseTest
|
|||
* on test failure
|
||||
*/
|
||||
@Test
|
||||
@Ignore("RELEASE")
|
||||
public void testFastClose() throws Exception
|
||||
{
|
||||
try (BlockheadClient client = new BlockheadClient(server.getServerUri()))
|
||||
|
@ -252,7 +250,6 @@ public class WebSocketCloseTest
|
|||
* on test failure
|
||||
*/
|
||||
@Test
|
||||
@Ignore("RELEASE")
|
||||
public void testFastFail() throws Exception
|
||||
{
|
||||
try (BlockheadClient client = new BlockheadClient(server.getServerUri()))
|
||||
|
|
Loading…
Reference in New Issue