Enabling more websocket tests

This commit is contained in:
Joakim Erdfelt 2017-06-21 07:56:48 -07:00
parent 21b1ecef7b
commit 31705767f0
3 changed files with 213 additions and 224 deletions

View File

@ -179,7 +179,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
private void close(CloseInfo closeInfo, FrameCallback callback)
{
connectionState.onClosing(); // move to CLOSING state (always)
connectionState.onClosing(); // always move to (at least) the CLOSING state (might already be past it, which is ok)
if (closeSent.compareAndSet(false, true))
{
@ -192,7 +192,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
{
if (LOG.isDebugEnabled())
LOG.debug("Close Frame Previously Sent: ignoring: {} [{}]", closeInfo, callback);
callback.succeed();
callback.fail(new WebSocketException("Already closed"));
}
}

View File

@ -1,139 +1,141 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.tests;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
public abstract class AbstractTrackingEndpoint<T>
{
public final Logger LOG;
public T session;
public CountDownLatch openLatch = new CountDownLatch(1);
public CountDownLatch closeLatch = new CountDownLatch(1);
public CountDownLatch errorLatch = new CountDownLatch(1);
public AtomicReference<CloseInfo> closeInfo = new AtomicReference<>();
public AtomicReference<Throwable> error = new AtomicReference<>();
public AbstractTrackingEndpoint(String id)
{
LOG = Log.getLogger(this.getClass().getName() + "." + id);
LOG.debug("init");
}
public void assertCloseInfo(String prefix, int expectedCloseStatusCode, Matcher<? super String> reasonMatcher) throws InterruptedException
{
CloseInfo close = closeInfo.get();
assertThat(prefix + " close info", close, Matchers.notNullValue());
assertThat(prefix + " received close code", close.getStatusCode(), Matchers.is(expectedCloseStatusCode));
assertThat(prefix + " received close reason", close.getReason(), reasonMatcher);
}
public void assertErrorEvent(String prefix, Matcher<Throwable> throwableMatcher, Matcher<? super String> messageMatcher)
{
assertThat(prefix + " error event type", error.get(), throwableMatcher);
assertThat(prefix + " error event message", error.get().getMessage(), messageMatcher);
}
public void assertNoErrorEvents(String prefix)
{
assertTrue(prefix + " error event should not have occurred", error.get() == null);
}
public void assertNotClosed(String prefix)
{
assertTrue(prefix + " close event should not have occurred: got " + closeInfo.get(), closeLatch.getCount() > 0);
}
public void assertNotOpened(String prefix)
{
assertTrue(prefix + " open event should not have occurred", openLatch.getCount() > 0);
}
public void awaitCloseEvent(String prefix) throws InterruptedException
{
assertTrue(prefix + " onClose event", closeLatch.await(Defaults.CLOSE_EVENT_TIMEOUT_MS, TimeUnit.MILLISECONDS));
}
public void awaitOpenEvent(String prefix) throws InterruptedException
{
assertTrue(prefix + " onOpen event", openLatch.await(Defaults.OPEN_EVENT_TIMEOUT_MS, TimeUnit.MILLISECONDS));
}
public void awaitErrorEvent(String prefix) throws InterruptedException
{
assertTrue(prefix + " onError event", errorLatch.await(Defaults.CLOSE_EVENT_TIMEOUT_MS, TimeUnit.MILLISECONDS));
}
protected void onWSOpen(T session)
{
this.session = session;
if (LOG.isDebugEnabled())
{
LOG.debug("onWSOpen()");
}
this.openLatch.countDown();
}
protected void onWSClose(int statusCode, String reason)
{
if(LOG.isDebugEnabled())
{
LOG.debug("onWSClose({}, {})", statusCode, reason);
}
CloseInfo close = new CloseInfo(statusCode, reason);
if (closeInfo.compareAndSet(null, close) == false)
{
LOG.warn("onClose should only happen once - Original Close: " + closeInfo.get());
LOG.warn("onClose should only happen once - Extra/Excess Close: " + close);
fail("onClose should only happen once!");
}
this.closeLatch.countDown();
}
protected void onWSError(Throwable cause)
{
if(LOG.isDebugEnabled())
{
LOG.debug("onWSError()", cause);
}
assertThat("Error must have value", cause, notNullValue());
if (error.compareAndSet(null, cause) == false)
{
LOG.warn("onError should only happen once - Original Cause", error.get());
LOG.warn("onError should only happen once - Extra/Excess Cause", cause);
fail("onError should only happen once!");
}
this.errorLatch.countDown();
}
}
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.tests;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
public abstract class AbstractTrackingEndpoint<T>
{
public final Logger LOG;
public T session;
public CountDownLatch openLatch = new CountDownLatch(1);
public CountDownLatch closeLatch = new CountDownLatch(1);
public CountDownLatch errorLatch = new CountDownLatch(1);
public AtomicReference<CloseInfo> closeInfo = new AtomicReference<>();
public AtomicReference<Throwable> closeStack = new AtomicReference<>();
public AtomicReference<Throwable> error = new AtomicReference<>();
public AbstractTrackingEndpoint(String id)
{
LOG = Log.getLogger(this.getClass().getName() + "." + id);
LOG.debug("init");
}
public void assertCloseInfo(String prefix, int expectedCloseStatusCode, Matcher<? super String> reasonMatcher) throws InterruptedException
{
CloseInfo close = closeInfo.get();
assertThat(prefix + " close info", close, Matchers.notNullValue());
assertThat(prefix + " received close code", close.getStatusCode(), Matchers.is(expectedCloseStatusCode));
assertThat(prefix + " received close reason", close.getReason(), reasonMatcher);
}
public void assertErrorEvent(String prefix, Matcher<Throwable> throwableMatcher, Matcher<? super String> messageMatcher)
{
assertThat(prefix + " error event type", error.get(), throwableMatcher);
assertThat(prefix + " error event message", error.get().getMessage(), messageMatcher);
}
public void assertNoErrorEvents(String prefix)
{
assertTrue(prefix + " error event should not have occurred", error.get() == null);
}
public void assertNotClosed(String prefix)
{
assertTrue(prefix + " close event should not have occurred: got " + closeInfo.get(), closeLatch.getCount() > 0);
}
public void assertNotOpened(String prefix)
{
assertTrue(prefix + " open event should not have occurred", openLatch.getCount() > 0);
}
public void awaitCloseEvent(String prefix) throws InterruptedException
{
assertTrue(prefix + " onClose event", closeLatch.await(Defaults.CLOSE_EVENT_TIMEOUT_MS, TimeUnit.MILLISECONDS));
}
public void awaitOpenEvent(String prefix) throws InterruptedException
{
assertTrue(prefix + " onOpen event", openLatch.await(Defaults.OPEN_EVENT_TIMEOUT_MS, TimeUnit.MILLISECONDS));
}
public void awaitErrorEvent(String prefix) throws InterruptedException
{
assertTrue(prefix + " onError event", errorLatch.await(Defaults.CLOSE_EVENT_TIMEOUT_MS, TimeUnit.MILLISECONDS));
}
protected void onWSOpen(T session)
{
this.session = session;
if (LOG.isDebugEnabled())
{
LOG.debug("onWSOpen()");
}
this.openLatch.countDown();
}
protected void onWSClose(int statusCode, String reason)
{
if(LOG.isDebugEnabled())
{
LOG.debug("onWSClose({}, {})", statusCode, reason);
}
CloseInfo close = new CloseInfo(statusCode, reason);
if (closeInfo.compareAndSet(null, close) == false)
{
LOG.warn("onClose should only happen once - Original Close: " + closeInfo.get(), closeStack.get());
LOG.warn("onClose should only happen once - Extra/Excess Close: " + close, new Throwable("extra/excess"));
fail("onClose should only happen once!");
}
closeStack.compareAndSet(null, new Throwable("original"));
this.closeLatch.countDown();
}
protected void onWSError(Throwable cause)
{
if(LOG.isDebugEnabled())
{
LOG.debug("onWSError()", cause);
}
assertThat("Error must have value", cause, notNullValue());
if (error.compareAndSet(null, cause) == false)
{
LOG.warn("onError should only happen once - Original Cause", error.get());
LOG.warn("onError should only happen once - Extra/Excess Cause", cause);
fail("onError should only happen once!");
}
this.errorLatch.countDown();
}
}

View File

@ -18,7 +18,6 @@
package org.eclipse.jetty.websocket.tests.client;
import static org.hamcrest.Matchers.anything;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
@ -26,7 +25,6 @@ import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
@ -51,7 +49,7 @@ import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.api.WebSocketTimeoutException;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
@ -62,7 +60,6 @@ import org.eclipse.jetty.websocket.tests.UntrustedWSServer;
import org.eclipse.jetty.websocket.tests.UntrustedWSSession;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
@ -70,52 +67,52 @@ import org.junit.rules.TestName;
public class ClientCloseTest
{
private static final Logger LOG = Log.getLogger(ClientCloseTest.class);
@Rule
public TestName testname = new TestName();
@Rule
public TestTracker tt = new TestTracker();
private UntrustedWSServer server;
private WebSocketClient client;
private void confirmConnection(TrackingEndpoint clientSocket, Future<Session> clientFuture, UntrustedWSSession serverSession) throws Exception
{
// Wait for client connect on via future
Session clientSession = clientFuture.get(Defaults.CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
clientSession.getRemote().setBatchMode(BatchMode.OFF);
// Wait for client connect via client websocket
assertThat("Client WebSocket is Open", clientSocket.openLatch.await(Defaults.OPEN_EVENT_TIMEOUT_MS, TimeUnit.MILLISECONDS), is(true));
UntrustedWSEndpoint serverEndpoint = serverSession.getUntrustedEndpoint();
// Send message from client to server
final String echoMsg = "echo-test";
Future<Void> testFut = clientSocket.getRemote().sendStringByFuture(echoMsg);
// Wait for send future
testFut.get(30, TimeUnit.SECONDS);
// Read Frame on server side
WebSocketFrame frame = serverEndpoint.framesQueue.poll(10, TimeUnit.SECONDS);
assertThat("Server received frame", frame.getOpCode(), is(OpCode.TEXT));
assertThat("Server received frame payload", frame.getPayloadAsUTF8(), is(echoMsg));
// Server send echo reply
serverEndpoint.getRemote().sendString(echoMsg);
// Wait for received echo
String incomingMessage = clientSocket.messageQueue.poll(1, TimeUnit.SECONDS);
// Verify received message
assertThat("Received message", incomingMessage, is(echoMsg));
// Verify that there are no errors
assertThat("Error events", clientSocket.error.get(), nullValue());
}
public static class TestClientTransportOverHTTP extends HttpClientTransportOverHTTP
{
@Override
@ -133,16 +130,16 @@ public class ClientCloseTest
};
}
}
public static class TestEndPoint extends SocketChannelEndPoint
{
public AtomicBoolean congestedFlush = new AtomicBoolean(false);
public TestEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler)
{
super((SocketChannel) channel, selector, key, scheduler);
}
@Override
public boolean flush(ByteBuffer... buffers) throws IOException
{
@ -152,7 +149,7 @@ public class ClientCloseTest
return flushed;
}
}
@Before
public void startClient() throws Exception
{
@ -161,123 +158,113 @@ public class ClientCloseTest
client.addBean(httpClient);
client.start();
}
@Before
public void startServer() throws Exception
{
server = new UntrustedWSServer();
server.registerWebSocket("/noclose", (req, resp) -> new WebSocketAdapter()
{
@Override
public void onWebSocketClose(int statusCode, String reason)
{
try
{
getSession().disconnect();
}
catch (IOException ignore)
{
}
}
});
server.start();
}
@After
public void stopClient() throws Exception
{
client.stop();
}
@After
public void stopServer() throws Exception
{
server.stop();
}
@Test
@Ignore("Not working yet")
public void testNetworkCongestion() throws Exception
{
// Set client timeout
final int timeout = 1000;
client.setMaxIdleTimeout(timeout);
URI wsUri = server.getUntrustedWsUri(this.getClass(), testname);
CompletableFuture<UntrustedWSSession> serverSessionFut = new CompletableFuture<>();
server.registerOnOpenFuture(wsUri, serverSessionFut);
// Client connects
TrackingEndpoint clientSocket = new TrackingEndpoint(testname.getMethodName());
Future<Session> clientConnectFuture = client.connect(clientSocket, wsUri);
// Server accepts connect
UntrustedWSSession serverSession = serverSessionFut.get(10, TimeUnit.SECONDS);
// client confirms connection via echo
confirmConnection(clientSocket, clientConnectFuture, serverSession);
// client sends BIG frames (until it cannot write anymore)
// server must not read (for test purpose, in order to congest connection)
// when write is congested, client enqueue close frame
// client initiate write, but write never completes
EndPoint endp = clientSocket.getJettyEndPoint();
assertThat("EndPoint is testable", endp, instanceOf(TestEndPoint.class));
TestEndPoint testendp = (TestEndPoint) endp;
char msg[] = new char[10240];
int writeCount = 0;
long writeSize = 0;
int i = 0;
while (!testendp.congestedFlush.get())
Session clientSession = clientConnectFuture.get(Defaults.CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
try
{
int z = i - ((i / 26) * 26);
char c = (char) ('a' + z);
Arrays.fill(msg, c);
clientSocket.getRemote().sendStringByFuture(String.valueOf(msg));
writeCount++;
writeSize += msg.length;
// client sends frames big enough to break through network layer caching, until it cannot write anymore.
// server must not read (for test purpose, in order to congest connection)
// when write is congested, client enqueue close frame
// client initiate write, but write never completes
EndPoint endp = clientSocket.getJettyEndPoint();
assertThat("EndPoint is testable", endp, instanceOf(TestEndPoint.class));
TestEndPoint testendp = (TestEndPoint) endp;
char msg[] = new char[10240];
int writeCount = 0;
long writeSize = 0;
int i = 0;
while (!testendp.congestedFlush.get())
{
int z = i - ((i / 26) * 26);
char c = (char) ('a' + z);
Arrays.fill(msg, c);
clientSocket.getRemote().sendStringByFuture(String.valueOf(msg));
writeCount++;
writeSize += msg.length;
}
LOG.info("Wrote {} frames totalling {} bytes of payload before congestion kicked in", writeCount, writeSize);
// Verify timeout error
clientSocket.awaitErrorEvent("Client");
clientSocket.assertErrorEvent("Client", instanceOf(WebSocketTimeoutException.class), containsString("Idle Timeout"));
}
finally
{
clientSession.close();
}
LOG.info("Wrote {} frames totalling {} bytes of payload before congestion kicked in", writeCount, writeSize);
// Verify timeout error
clientSocket.assertErrorEvent("Client", instanceOf(SocketTimeoutException.class), anything());
}
@Test(timeout = 5000L)
public void testStopLifecycle() throws Exception
{
// Set client timeout
final int timeout = 1000;
client.setMaxIdleTimeout(timeout);
int clientCount = 3;
TrackingEndpoint clientSockets[] = new TrackingEndpoint[clientCount];
UntrustedWSSession serverSessions[] = new UntrustedWSSession[clientCount];
// Connect Multiple Clients
for (int i = 0; i < clientCount; i++)
{
URI wsUri = server.getUntrustedWsUri(this.getClass(), testname).resolve(Integer.toString(i));
CompletableFuture<UntrustedWSSession> serverSessionFut = new CompletableFuture<>();
server.registerOnOpenFuture(wsUri, serverSessionFut);
// Client Request Upgrade
clientSockets[i] = new TrackingEndpoint(testname.getMethodName() + "[" + i + "]");
Future<Session> clientConnectFuture = client.connect(clientSockets[i], wsUri);
// Server accepts connection
serverSessions[i] = serverSessionFut.get(10, TimeUnit.SECONDS);
// client confirms connection via echo
confirmConnection(clientSockets[i], clientConnectFuture, serverSessions[i]);
}
// client lifecycle stop
// every open client should send a close frame
client.stop();
// clients send close frames (code 1001, shutdown)
for (int i = 0; i < clientCount; i++)
{
@ -286,7 +273,7 @@ public class ClientCloseTest
serverEndpoint.awaitCloseEvent("Server");
serverEndpoint.assertCloseInfo("Server", StatusCode.SHUTDOWN, containsString("Shutdown"));
}
// clients disconnect
for (int i = 0; i < clientCount; i++)
{