Issue #2282 - removing EventQueue from websocket tests

This commit is contained in:
Joakim Erdfelt 2018-03-05 17:52:23 -06:00
parent 076f3a8ab2
commit 1373025dff
59 changed files with 713 additions and 725 deletions

View File

@ -24,6 +24,7 @@ import java.io.Reader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.websocket.ClientEndpoint;
@ -36,7 +37,6 @@ import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.WebSocketContainer;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.log.Log;
@ -44,6 +44,7 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.test.BlockheadServer;
import org.eclipse.jetty.websocket.common.test.IBlockheadServerConnection;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -90,7 +91,7 @@ public class DecoderReaderManySmallTest
@ClientEndpoint(decoders = { EventIdDecoder.class })
public static class EventIdSocket
{
public EventQueue<EventId> messageQueue = new EventQueue<>();
public LinkedBlockingQueue<EventId> messageQueue = new LinkedBlockingQueue<>();
private CountDownLatch closeLatch = new CountDownLatch(1);
@OnClose
@ -102,7 +103,7 @@ public class DecoderReaderManySmallTest
@OnMessage
public void onMessage(EventId msg)
{
messageQueue.add(msg);
messageQueue.offer(msg);
}
public void awaitClose() throws InterruptedException
@ -208,12 +209,12 @@ public class DecoderReaderManySmallTest
idserver.writeSequentialIds(from,to);
idserver.close();
int count = from - to;
ids.messageQueue.awaitEventCount(count,4,TimeUnit.SECONDS);
ids.awaitClose();
// collect seen ids
List<Integer> seen = new ArrayList<>();
for(EventId id: ids.messageQueue)
for(int i=0; i<count; i++)
{
EventId id = ids.messageQueue.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
// validate that ids don't repeat.
Assert.assertFalse("Already saw ID: " + id.eventId, seen.contains(id.eventId));
seen.add(id.eventId);

View File

@ -18,7 +18,7 @@
package org.eclipse.jetty.websocket.jsr356;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.is;
import java.io.BufferedReader;
import java.io.File;
@ -28,6 +28,7 @@ import java.io.Reader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.websocket.ClientEndpoint;
@ -40,7 +41,6 @@ import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.WebSocketContainer;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.BufferUtil;
@ -52,6 +52,7 @@ import org.eclipse.jetty.websocket.common.frames.ContinuationFrame;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.test.BlockheadServer;
import org.eclipse.jetty.websocket.common.test.IBlockheadServerConnection;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -126,7 +127,7 @@ public class DecoderReaderTest
@ClientEndpoint(decoders = { QuotesDecoder.class })
public static class QuotesSocket
{
public EventQueue<Quotes> messageQueue = new EventQueue<>();
public LinkedBlockingQueue<Quotes> messageQueue = new LinkedBlockingQueue<>();
private CountDownLatch closeLatch = new CountDownLatch(1);
@OnClose
@ -139,7 +140,7 @@ public class DecoderReaderTest
public synchronized void onMessage(Quotes msg)
{
Integer h=hashCode();
messageQueue.add(msg);
messageQueue.offer(msg);
System.out.printf("%x: Quotes from: %s%n",h,msg.author);
for (String quote : msg.quotes)
{
@ -259,7 +260,7 @@ public class DecoderReaderTest
server.stop();
}
// TODO analyse and fix
// TODO analyse and fix
@Ignore
@Test
public void testSingleQuotes() throws Exception
@ -270,15 +271,14 @@ public class DecoderReaderTest
client.connectToServer(quoter,server.getWsUri());
qserver.awaitConnect();
qserver.writeQuotes("quotes-ben.txt");
quoter.messageQueue.awaitEventCount(1,1000,TimeUnit.MILLISECONDS);
qserver.close();
quoter.awaitClose();
Quotes quotes = quoter.messageQueue.poll();
Quotes quotes = quoter.messageQueue.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Quotes Author",quotes.author,is("Benjamin Franklin"));
Assert.assertThat("Quotes Count",quotes.quotes.size(),is(3));
}
// TODO analyse and fix
// TODO analyse and fix
@Test
@Ignore ("Quotes appear to be able to arrive in any order?")
public void testTwoQuotes() throws Exception
@ -290,11 +290,12 @@ public class DecoderReaderTest
qserver.awaitConnect();
qserver.writeQuotes("quotes-ben.txt");
qserver.writeQuotes("quotes-twain.txt");
quoter.messageQueue.awaitEventCount(2,1000,TimeUnit.MILLISECONDS);
qserver.close();
quoter.awaitClose();
Quotes quotes = quoter.messageQueue.poll();
Quotes quotes = quoter.messageQueue.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Quotes Author",quotes.author,is("Benjamin Franklin"));
Assert.assertThat("Quotes Count",quotes.quotes.size(),is(3));
quotes = quoter.messageQueue.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Quotes Author",quotes.author,is("Mark Twain"));
}
}

View File

@ -26,8 +26,9 @@ import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.ContainerProvider;
@ -39,7 +40,6 @@ import javax.websocket.MessageHandler;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.component.LifeCycle;
@ -47,6 +47,7 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.common.test.BlockheadServer;
import org.eclipse.jetty.websocket.common.test.IBlockheadServerConnection;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -55,9 +56,8 @@ import org.junit.Test;
public class EncoderTest
{
private static class EchoServer implements Runnable
private static class EchoServer
{
private Thread thread;
private BlockheadServer server;
private IBlockheadServerConnection sconnection;
private CountDownLatch connectLatch = new CountDownLatch(1);
@ -67,37 +67,32 @@ public class EncoderTest
this.server = server;
}
@Override
public void run()
{
try
{
sconnection = server.accept();
sconnection.setSoTimeout(60000);
sconnection.upgrade();
sconnection.startEcho();
}
catch (Exception e)
{
LOG.warn(e);
}
finally
{
connectLatch.countDown();
}
}
public void start()
{
this.thread = new Thread(this,"EchoServer");
this.thread.start();
CompletableFuture.runAsync(() -> {
try
{
sconnection = server.accept();
sconnection.setSoTimeout(10000);
sconnection.upgrade();
sconnection.enableIncomingEcho(true);
sconnection.startReadThread();
}
catch (Exception e)
{
LOG.warn(e);
}
finally
{
connectLatch.countDown();
}
});
}
public void stop()
{
if (this.sconnection != null)
{
this.sconnection.stopEcho();
try
{
this.sconnection.close();
@ -166,12 +161,12 @@ public class EncoderTest
public static class QuotesSocket extends Endpoint implements MessageHandler.Whole<String>
{
private Session session;
private EventQueue<String> messageQueue = new EventQueue<>();
private LinkedBlockingQueue<String> messageQueue = new LinkedBlockingQueue<>();
@Override
public void onMessage(String message)
{
messageQueue.add(message);
messageQueue.offer(message);
}
@Override
@ -236,6 +231,7 @@ public class EncoderTest
public void initClient()
{
client = ContainerProvider.getWebSocketContainer();
client.setDefaultMaxSessionIdleTimeout(10000);
}
@After
@ -257,7 +253,7 @@ public class EncoderTest
server.stop();
}
@Test
@Test(timeout = 10000)
public void testSingleQuotes() throws Exception
{
EchoServer eserver = new EchoServer(server);
@ -277,9 +273,7 @@ public class EncoderTest
Quotes ben = getQuotes("quotes-ben.txt");
quoter.write(ben);
quoter.messageQueue.awaitEventCount(1,1000,TimeUnit.MILLISECONDS);
String result = quoter.messageQueue.poll();
String result = quoter.messageQueue.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
assertReceivedQuotes(result,ben);
}
finally
@ -288,7 +282,7 @@ public class EncoderTest
}
}
@Test
@Test(timeout = 10000)
public void testTwoQuotes() throws Exception
{
EchoServer eserver = new EchoServer(server);
@ -309,11 +303,9 @@ public class EncoderTest
quoter.write(ben);
quoter.write(twain);
quoter.messageQueue.awaitEventCount(2,1000,TimeUnit.MILLISECONDS);
String result = quoter.messageQueue.poll();
String result = quoter.messageQueue.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
assertReceivedQuotes(result,ben);
result = quoter.messageQueue.poll();
result = quoter.messageQueue.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
assertReceivedQuotes(result,twain);
}
finally

View File

@ -23,15 +23,16 @@ import static org.hamcrest.Matchers.is;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import javax.websocket.ClientEndpoint;
import javax.websocket.ClientEndpointConfig;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.eclipse.jetty.websocket.jsr356.ClientContainer;
import org.eclipse.jetty.websocket.jsr356.annotations.AnnotatedEndpointScanner;
import org.eclipse.jetty.websocket.jsr356.annotations.JsrEvents;
@ -118,9 +119,8 @@ public class OnCloseTest
driver.onClose(new CloseInfo(StatusCode.NORMAL,"normal"));
// Test captured event
EventQueue<String> events = endpoint.eventQueue;
Assert.assertThat("Number of Events Captured",events.size(),is(1));
String closeEvent = events.poll();
LinkedBlockingQueue<String> events = endpoint.eventQueue;
String closeEvent = events.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Close Event",closeEvent,is(testcase.expectedCloseEvent));
}
}

View File

@ -23,12 +23,12 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.websocket.CloseReason;
import javax.websocket.CloseReason.CloseCode;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.junit.Assert;
@ -41,8 +41,8 @@ public abstract class TrackingSocket
private static final Logger LOG = Log.getLogger(TrackingSocket.class);
public CloseReason closeReason;
public EventQueue<String> eventQueue = new EventQueue<String>();
public EventQueue<Throwable> errorQueue = new EventQueue<>();
public LinkedBlockingQueue<String> eventQueue = new LinkedBlockingQueue<>();
public LinkedBlockingQueue<Throwable> errorQueue = new LinkedBlockingQueue<>();
public CountDownLatch openLatch = new CountDownLatch(1);
public CountDownLatch closeLatch = new CountDownLatch(1);
public CountDownLatch dataLatch = new CountDownLatch(1);
@ -50,12 +50,12 @@ public abstract class TrackingSocket
protected void addError(Throwable t)
{
LOG.warn(t);
errorQueue.add(t);
errorQueue.offer(t);
}
protected void addEvent(String format, Object... args)
{
eventQueue.add(String.format(format,args));
eventQueue.offer(String.format(format,args));
}
public void assertClose(CloseCode expectedCode, String expectedReason) throws InterruptedException
@ -76,12 +76,6 @@ public abstract class TrackingSocket
Assert.assertThat("Close Reason",closeReason.getReasonPhrase(),is(expectedReason));
}
public void assertEvent(String expected)
{
String actual = eventQueue.poll();
Assert.assertEquals("Event",expected,actual);
}
public void assertIsOpen() throws InterruptedException
{
assertWasOpened();

View File

@ -1,6 +1,6 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.LEVEL=WARN
# org.eclipse.jetty.websocket.LEVEL=INFO
# org.eclipse.jetty.websocket.LEVEL=DEBUG
# org.eclipse.jetty.websocket.LEVEL=ALL
# org.eclipse.jetty.websocket.jsr356.LEVEL=DEBUG

View File

@ -23,8 +23,8 @@ import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertThat;
import java.net.URI;
import java.util.Queue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.ByteBufferPool;
@ -34,6 +34,7 @@ import org.eclipse.jetty.toolchain.test.TestingDir;
import org.eclipse.jetty.webapp.WebAppContext;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.eclipse.jetty.websocket.jsr356.server.samples.echo.BasicEchoSocket;
import org.junit.Assert;
import org.junit.Rule;
@ -81,8 +82,8 @@ public class AltFilterTest
// wait for connect
future.get(1,TimeUnit.SECONDS);
clientEcho.sendMessage("Hello Echo");
Queue<String> msgs = clientEcho.awaitMessages(1);
Assert.assertEquals("Expected message","Hello Echo",msgs.poll());
LinkedBlockingQueue<String> msgs = clientEcho.incomingMessages;
Assert.assertEquals("Expected message","Hello Echo",msgs.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT));
}
finally
{

View File

@ -22,8 +22,8 @@ import static org.hamcrest.Matchers.containsString;
import java.io.File;
import java.net.URI;
import java.util.Queue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.ByteBufferPool;
@ -33,6 +33,7 @@ import org.eclipse.jetty.webapp.WebAppContext;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.eclipse.jetty.websocket.jsr356.server.samples.beans.DateDecoder;
import org.eclipse.jetty.websocket.jsr356.server.samples.beans.TimeEncoder;
import org.eclipse.jetty.websocket.jsr356.server.samples.echo.ConfiguredEchoSocket;
@ -40,7 +41,6 @@ import org.eclipse.jetty.websocket.jsr356.server.samples.echo.EchoSocketConfigur
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
/**
@ -90,9 +90,9 @@ public class AnnotatedServerEndpointTest
foo.get(1,TimeUnit.SECONDS);
clientEcho.sendMessage(message);
Queue<String> msgs = clientEcho.awaitMessages(1);
LinkedBlockingQueue<String> msgs = clientEcho.incomingMessages;
String response = msgs.poll();
String response = msgs.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
for (String expected : expectedTexts)
{
Assert.assertThat("Expected message",response,containsString(expected));

View File

@ -19,8 +19,8 @@
package org.eclipse.jetty.websocket.jsr356.server;
import java.net.URI;
import java.util.Queue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.ByteBufferPool;
@ -29,6 +29,7 @@ import org.eclipse.jetty.toolchain.test.TestingDir;
import org.eclipse.jetty.webapp.WebAppContext;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.eclipse.jetty.websocket.jsr356.server.samples.echo.BasicEchoEndpoint;
import org.eclipse.jetty.websocket.jsr356.server.samples.echo.BasicEchoEndpointConfigContextListener;
import org.junit.Assert;
@ -73,8 +74,8 @@ public class BasicEndpointTest
// wait for connect
future.get(1,TimeUnit.SECONDS);
clientEcho.sendMessage("Hello World");
Queue<String> msgs = clientEcho.awaitMessages(1);
Assert.assertEquals("Expected message","Hello World",msgs.poll());
LinkedBlockingQueue<String> msgs = clientEcho.incomingMessages;
Assert.assertEquals("Expected message","Hello World",msgs.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT));
}
finally
{

View File

@ -35,7 +35,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@ -54,7 +54,6 @@ import javax.websocket.server.ServerEndpointConfig;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.util.QuoteUtil;
@ -63,6 +62,7 @@ import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
import org.eclipse.jetty.websocket.common.test.HttpResponse;
import org.eclipse.jetty.websocket.common.test.IBlockheadClient;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.eclipse.jetty.websocket.jsr356.server.deploy.WebSocketServerContainerInitializer;
import org.junit.AfterClass;
import org.junit.Assert;
@ -437,8 +437,8 @@ public class ConfiguratorTest
assertThat("response.extensions", response.getExtensionsHeader(), nullValue());
client.write(new TextFrame().setPayload("NegoExts"));
EventQueue<WebSocketFrame> frames = client.readFrames(1, 1, TimeUnit.SECONDS);
WebSocketFrame frame = frames.poll();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame frame = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
assertThat("Frame Response", frame.getPayloadAsUTF8(), is("negotiatedExtensions=[]"));
}
}
@ -456,8 +456,8 @@ public class ConfiguratorTest
client.expectUpgradeResponse();
client.write(new TextFrame().setPayload("X-Dummy"));
EventQueue<WebSocketFrame> frames = client.readFrames(1, 1, TimeUnit.SECONDS);
WebSocketFrame frame = frames.poll();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame frame = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Frame Response", frame.getPayloadAsUTF8(), is("Request Header [X-Dummy]: \"Bogus\""));
}
}
@ -475,8 +475,8 @@ public class ConfiguratorTest
client.expectUpgradeResponse();
client.write(new TextFrame().setPayload("apple"));
EventQueue<WebSocketFrame> frames = client.readFrames(1, 1, TimeUnit.SECONDS);
WebSocketFrame frame = frames.poll();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame frame = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Frame Response", frame.getPayloadAsUTF8(), is("Requested User Property: [apple] = \"fruit from tree\""));
}
@ -489,8 +489,8 @@ public class ConfiguratorTest
client.write(new TextFrame().setPayload("apple"));
client.write(new TextFrame().setPayload("blueberry"));
EventQueue<WebSocketFrame> frames = client.readFrames(2, 1, TimeUnit.SECONDS);
WebSocketFrame frame = frames.poll();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame frame = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
// should have no value
Assert.assertThat("Frame Response", frame.getPayloadAsUTF8(), is("Requested User Property: [apple] = <null>"));
@ -515,8 +515,8 @@ public class ConfiguratorTest
InetSocketAddress expectedRemote = client.getRemoteSocketAddress();
client.write(new TextFrame().setPayload("addr"));
EventQueue<WebSocketFrame> frames = client.readFrames(1, 1, TimeUnit.SECONDS);
WebSocketFrame frame = frames.poll();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame frame = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
StringWriter expected = new StringWriter();
PrintWriter out = new PrintWriter(expected);
@ -548,8 +548,8 @@ public class ConfiguratorTest
client.expectUpgradeResponse();
client.write(new TextFrame().setPayload("getProtocols"));
EventQueue<WebSocketFrame> frames = client.readFrames(1, 1, TimeUnit.SECONDS);
WebSocketFrame frame = frames.poll();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame frame = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Frame Response", frame.getPayloadAsUTF8(), is("Requested Protocols: [\"echo\"]"));
}
}
@ -572,8 +572,8 @@ public class ConfiguratorTest
client.expectUpgradeResponse();
client.write(new TextFrame().setPayload("getProtocols"));
EventQueue<WebSocketFrame> frames = client.readFrames(1, 1, TimeUnit.SECONDS);
WebSocketFrame frame = frames.poll();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame frame = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Frame Response", frame.getPayloadAsUTF8(), is("Requested Protocols: [\"echo\",\"chat\",\"status\"]"));
}
}
@ -596,8 +596,8 @@ public class ConfiguratorTest
client.expectUpgradeResponse();
client.write(new TextFrame().setPayload("getProtocols"));
EventQueue<WebSocketFrame> frames = client.readFrames(1, 1, TimeUnit.SECONDS);
WebSocketFrame frame = frames.poll();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame frame = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Frame Response", frame.getPayloadAsUTF8(), is("Requested Protocols: [\"echo\",\"chat\",\"status\"]"));
}
}
@ -620,8 +620,8 @@ public class ConfiguratorTest
client.expectUpgradeResponse();
client.write(new TextFrame().setPayload("getProtocols"));
EventQueue<WebSocketFrame> frames = client.readFrames(1, 1, TimeUnit.SECONDS);
WebSocketFrame frame = frames.poll();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame frame = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Frame Response", frame.getPayloadAsUTF8(), is("Requested Protocols: [\"echo\",\"chat\",\"status\"]"));
}
}
@ -642,8 +642,8 @@ public class ConfiguratorTest
client.expectUpgradeResponse();
client.write(new TextFrame().setPayload("2016-06-20T14:27:44"));
EventQueue<WebSocketFrame> frames = client.readFrames(1, 1, TimeUnit.SECONDS);
WebSocketFrame frame = frames.poll();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame frame = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Frame Response", frame.getPayloadAsUTF8(), is("cal=2016.06.20 AD at 14:27:44 +0000"));
}
}

View File

@ -20,8 +20,6 @@ package org.eclipse.jetty.websocket.jsr356.server;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.websocket.ClientEndpoint;
import javax.websocket.CloseReason;
@ -38,15 +36,9 @@ import org.eclipse.jetty.util.BufferUtil;
@ClientEndpoint
public class EchoClientSocket extends TrackingSocket
{
public final CountDownLatch eventCountLatch;
private Session session;
private Basic remote;
public EchoClientSocket(int expectedEventCount)
{
this.eventCountLatch = new CountDownLatch(expectedEventCount);
}
public void close() throws IOException
{
if (session != null)
@ -88,12 +80,6 @@ public class EchoClientSocket extends TrackingSocket
public void onText(String text)
{
addEvent(text);
eventCountLatch.countDown();
}
public boolean awaitAllEvents(long timeout, TimeUnit unit) throws InterruptedException
{
return eventCountLatch.await(timeout,unit);
}
public void sendObject(Object obj) throws IOException, EncodeException

View File

@ -18,22 +18,23 @@
package org.eclipse.jetty.websocket.jsr356.server;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import java.io.File;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.websocket.ContainerProvider;
import javax.websocket.WebSocketContainer;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.webapp.WebAppContext;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.eclipse.jetty.websocket.jsr356.server.EchoCase.PartialBinary;
import org.eclipse.jetty.websocket.jsr356.server.EchoCase.PartialText;
import org.eclipse.jetty.websocket.jsr356.server.samples.binary.ByteBufferSocket;
@ -255,7 +256,7 @@ public class EchoTest
public void testEcho() throws Exception
{
int messageCount = testcase.getMessageCount();
EchoClientSocket socket = new EchoClientSocket(messageCount);
EchoClientSocket socket = new EchoClientSocket();
URI toUri = serverUri.resolve(testcase.path.substring(1));
try
@ -284,13 +285,13 @@ public class EchoTest
}
// Collect Responses
socket.awaitAllEvents(1,TimeUnit.SECONDS);
EventQueue<String> received = socket.eventQueue;
LinkedBlockingQueue<String> received = socket.eventQueue;
// Validate Responses
for (String expected : testcase.expectedStrings)
{
Assert.assertThat("Received Echo Responses",received,contains(expected));
String actual = received.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Received Echo Responses",actual,containsString(expected));
}
}
finally

View File

@ -18,15 +18,14 @@
package org.eclipse.jetty.websocket.jsr356.server;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;
import java.net.URI;
import java.util.Queue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.log.Log;
@ -35,6 +34,7 @@ import org.eclipse.jetty.util.log.StacklessLogging;
import org.eclipse.jetty.webapp.WebAppContext;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.eclipse.jetty.websocket.jsr356.annotations.JsrEvents;
import org.eclipse.jetty.websocket.jsr356.server.samples.idletimeout.IdleTimeoutContextListener;
import org.eclipse.jetty.websocket.jsr356.server.samples.idletimeout.OnOpenIdleTimeoutEndpoint;
@ -100,10 +100,12 @@ public class IdleTimeoutTest
clientEcho.sendMessage("You shouldn't be there");
try
{
Queue<String> msgs = clientEcho.awaitMessages(1);
assertThat("Should not have received messages echoed back",msgs,is(empty()));
LinkedBlockingQueue<String> msgs = clientEcho.incomingMessages;
// should not have a message.
String received = msgs.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
assertThat("Should not have received messages echoed back",received,is(nullValue()));
}
catch (TimeoutException | InterruptedException e)
catch (InterruptedException e)
{
// valid success path
}

View File

@ -19,13 +19,10 @@
package org.eclipse.jetty.websocket.jsr356.server;
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
@ -50,13 +47,7 @@ public class JettyEchoSocket
private Session session;
private Lock remoteLock = new ReentrantLock();
private RemoteEndpoint remote;
private EventQueue<String> incomingMessages = new EventQueue<>();
public Queue<String> awaitMessages(int expected) throws TimeoutException, InterruptedException
{
incomingMessages.awaitEventCount(expected,2,TimeUnit.SECONDS);
return incomingMessages;
}
public LinkedBlockingQueue<String> incomingMessages = new LinkedBlockingQueue<>();
public boolean getClosed()
{

View File

@ -21,8 +21,8 @@ package org.eclipse.jetty.websocket.jsr356.server;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.ByteBufferPool;
@ -31,6 +31,7 @@ import org.eclipse.jetty.toolchain.test.TestingDir;
import org.eclipse.jetty.webapp.WebAppContext;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.eclipse.jetty.websocket.jsr356.server.samples.echo.LargeEchoConfiguredSocket;
import org.junit.Assert;
import org.junit.Ignore;
@ -78,8 +79,8 @@ public class LargeAnnotatedTest
Arrays.fill(txt,(byte)'o');
String msg = new String(txt,StandardCharsets.UTF_8);
clientEcho.sendMessage(msg);
Queue<String> msgs = clientEcho.awaitMessages(1);
Assert.assertEquals("Expected message",msg,msgs.poll());
LinkedBlockingQueue<String> msgs = clientEcho.incomingMessages;
Assert.assertEquals("Expected message",msg,msgs.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT));
}
finally
{

View File

@ -21,8 +21,8 @@ package org.eclipse.jetty.websocket.jsr356.server;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.ByteBufferPool;
@ -31,6 +31,7 @@ import org.eclipse.jetty.toolchain.test.TestingDir;
import org.eclipse.jetty.webapp.WebAppContext;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.eclipse.jetty.websocket.jsr356.server.samples.echo.LargeEchoDefaultSocket;
import org.junit.Assert;
import org.junit.Ignore;
@ -78,8 +79,8 @@ public class LargeContainerTest
Arrays.fill(txt,(byte)'o');
String msg = new String(txt,StandardCharsets.UTF_8);
clientEcho.sendMessage(msg);
Queue<String> msgs = clientEcho.awaitMessages(1);
Assert.assertEquals("Expected message",msg,msgs.poll());
LinkedBlockingQueue<String> msgs = clientEcho.incomingMessages;
Assert.assertEquals("Expected message",msg,msgs.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT));
}
finally
{

View File

@ -18,7 +18,6 @@
package org.eclipse.jetty.websocket.jsr356.server;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import java.io.File;
@ -28,9 +27,9 @@ import java.util.concurrent.TimeUnit;
import javax.websocket.ContainerProvider;
import javax.websocket.WebSocketContainer;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.webapp.WebAppContext;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.eclipse.jetty.websocket.jsr356.server.samples.pong.PongContextListener;
import org.eclipse.jetty.websocket.jsr356.server.samples.pong.PongMessageEndpoint;
import org.eclipse.jetty.websocket.jsr356.server.samples.pong.PongSocket;
@ -78,7 +77,7 @@ public class PingPongTest
@Test(timeout = 2000)
public void testPingEndpoint() throws Exception
{
EchoClientSocket socket = new EchoClientSocket(1);
EchoClientSocket socket = new EchoClientSocket();
URI toUri = serverUri.resolve("ping");
try
@ -91,12 +90,8 @@ public class PingPongTest
String msg = "hello";
socket.sendPing(msg);
// Collect Responses
socket.awaitAllEvents(1,TimeUnit.SECONDS);
EventQueue<String> received = socket.eventQueue;
// Validate Responses
String actual = received.poll();
String actual = socket.eventQueue.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Received Ping Response",actual,containsString("PongMessage[/ping]:" + msg));
}
finally
@ -109,7 +104,7 @@ public class PingPongTest
@Test(timeout = 2000)
public void testPongEndpoint() throws Exception
{
EchoClientSocket socket = new EchoClientSocket(1);
EchoClientSocket socket = new EchoClientSocket();
URI toUri = serverUri.resolve("pong");
try
@ -122,12 +117,9 @@ public class PingPongTest
String msg = "hello";
socket.sendPong(msg);
// Collect Responses
socket.awaitAllEvents(1,TimeUnit.SECONDS);
EventQueue<String> received = socket.eventQueue;
// Validate Responses
Assert.assertThat("Received Ping Responses",received,contains("PongMessage[/pong]:" + msg));
String received = socket.eventQueue.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Received Ping Responses",received,containsString("PongMessage[/pong]:" + msg));
}
finally
{
@ -139,7 +131,7 @@ public class PingPongTest
@Test(timeout = 2000)
public void testPingSocket() throws Exception
{
EchoClientSocket socket = new EchoClientSocket(1);
EchoClientSocket socket = new EchoClientSocket();
URI toUri = serverUri.resolve("ping-socket");
try
@ -152,12 +144,8 @@ public class PingPongTest
String msg = "hello";
socket.sendPing(msg);
// Collect Responses
socket.awaitAllEvents(1,TimeUnit.SECONDS);
EventQueue<String> received = socket.eventQueue;
// Validate Responses
String actual = received.poll();
String actual = socket.eventQueue.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Received Ping Response",actual,containsString("@OnMessage(PongMessage)[/ping-socket]:" + msg));
}
finally
@ -170,7 +158,7 @@ public class PingPongTest
@Test(timeout = 2000)
public void testPongSocket() throws Exception
{
EchoClientSocket socket = new EchoClientSocket(1);
EchoClientSocket socket = new EchoClientSocket();
URI toUri = serverUri.resolve("pong-socket");
try
@ -183,12 +171,9 @@ public class PingPongTest
String msg = "hello";
socket.sendPong(msg);
// Collect Responses
socket.awaitAllEvents(1,TimeUnit.SECONDS);
EventQueue<String> received = socket.eventQueue;
// Validate Responses
Assert.assertThat("Received Ping Responses",received,contains("@OnMessage(PongMessage)[/pong-socket]:" + msg));
String received = socket.eventQueue.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Received Ping Responses",received,containsString("@OnMessage(PongMessage)[/pong-socket]:" + msg));
}
finally
{

View File

@ -30,8 +30,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.webapp.WebAppContext;
@ -101,8 +99,6 @@ public class SessionTest
return cases;
}
public ByteBufferPool bufferPool = new MappedByteBufferPool();
private final Case testcase;
private final static AtomicInteger ID = new AtomicInteger(0);
private WSServer server;
@ -136,7 +132,7 @@ public class SessionTest
private void assertResponse(String requestPath, String requestMessage, String expectedResponse) throws Exception
{
WebSocketClient client = new WebSocketClient(bufferPool);
WebSocketClient client = new WebSocketClient();
try
{
client.start();

View File

@ -23,12 +23,12 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.websocket.CloseReason;
import javax.websocket.CloseReason.CloseCode;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.junit.Assert;
@ -41,8 +41,8 @@ public abstract class TrackingSocket
private static final Logger LOG = Log.getLogger(TrackingSocket.class);
public CloseReason closeReason;
public EventQueue<String> eventQueue = new EventQueue<String>();
public EventQueue<Throwable> errorQueue = new EventQueue<>();
public LinkedBlockingQueue<String> eventQueue = new LinkedBlockingQueue<>();
public LinkedBlockingQueue<Throwable> errorQueue = new LinkedBlockingQueue<>();
public CountDownLatch openLatch = new CountDownLatch(1);
public CountDownLatch closeLatch = new CountDownLatch(1);
public CountDownLatch dataLatch = new CountDownLatch(1);
@ -50,12 +50,12 @@ public abstract class TrackingSocket
protected void addError(Throwable t)
{
LOG.warn(t);
errorQueue.add(t);
errorQueue.offer(t);
}
protected void addEvent(String format, Object... args)
{
eventQueue.add(String.format(format,args));
eventQueue.offer(String.format(format,args));
}
public void assertClose(CloseCode expectedCode, String expectedReason) throws InterruptedException
@ -76,12 +76,6 @@ public abstract class TrackingSocket
Assert.assertThat("Close Reason",closeReason.getReasonPhrase(),is(expectedReason));
}
public void assertEvent(String expected)
{
String actual = eventQueue.poll();
Assert.assertEquals("Event",expected,actual);
}
public void assertIsOpen() throws InterruptedException
{
assertWasOpened();

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.websocket.jsr356.server.samples.echo;
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import javax.websocket.CloseReason;
import javax.websocket.OnMessage;
@ -26,14 +27,12 @@ import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import org.eclipse.jetty.toolchain.test.EventQueue;
@ServerEndpoint(value = "/echoreturn")
public class EchoReturnEndpoint
{
private Session session = null;
public CloseReason close = null;
public EventQueue<String> messageQueue = new EventQueue<>();
public LinkedBlockingQueue<String> messageQueue = new LinkedBlockingQueue<>();
public void onClose(CloseReason close)
{

View File

@ -37,6 +37,7 @@ import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@ -50,7 +51,6 @@ import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.io.SocketChannelEndPoint;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
@ -70,8 +70,8 @@ import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection;
import org.eclipse.jetty.websocket.common.test.BlockheadServer;
import org.eclipse.jetty.websocket.common.test.IBlockheadServerConnection;
import org.eclipse.jetty.websocket.common.test.IncomingFramesCapture;
import org.eclipse.jetty.websocket.common.test.RawFrameBuilder;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Before;
@ -94,7 +94,7 @@ public class ClientCloseTest
public CountDownLatch openLatch = new CountDownLatch(1);
public CountDownLatch errorLatch = new CountDownLatch(1);
public EventQueue<String> messageQueue = new EventQueue<>();
public LinkedBlockingQueue<String> messageQueue = new LinkedBlockingQueue<>();
public AtomicReference<Throwable> error = new AtomicReference<>();
public void assertNoCloseEvent()
@ -198,25 +198,22 @@ public class ClientCloseTest
final String echoMsg = "echo-test";
Future<Void> testFut = clientSocket.getRemote().sendStringByFuture(echoMsg);
serverConns.startReadThread();
// Wait for send future
testFut.get(30,TimeUnit.SECONDS);
// Read Frame on server side
IncomingFramesCapture serverCapture = serverConns.readFrames(1,30,TimeUnit.SECONDS);
serverCapture.assertNoErrors();
serverCapture.assertFrameCount(1);
WebSocketFrame frame = serverCapture.getFrames().poll();
LinkedBlockingQueue<WebSocketFrame> serverCapture = serverConns.getFrameQueue();
WebSocketFrame frame = serverCapture.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
assertThat("Server received frame",frame.getOpCode(),is(OpCode.TEXT));
assertThat("Server received frame payload",frame.getPayloadAsUTF8(),is(echoMsg));
// Server send echo reply
serverConns.write(new TextFrame().setPayload(echoMsg));
// Wait for received echo
clientSocket.messageQueue.awaitEventCount(1,1,TimeUnit.SECONDS);
// Verify received message
String recvMsg = clientSocket.messageQueue.poll();
String recvMsg = clientSocket.messageQueue.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
assertThat("Received message",recvMsg,is(echoMsg));
// Verify that there are no errors
@ -228,14 +225,11 @@ public class ClientCloseTest
}
}
private void confirmServerReceivedCloseFrame(IBlockheadServerConnection serverConn, int expectedCloseCode, Matcher<String> closeReasonMatcher) throws IOException,
TimeoutException
private void confirmServerReceivedCloseFrame(IBlockheadServerConnection serverConn, int expectedCloseCode, Matcher<String> closeReasonMatcher) throws InterruptedException
{
IncomingFramesCapture serverCapture = serverConn.readFrames(1,30,TimeUnit.SECONDS);
serverCapture.assertNoErrors();
serverCapture.assertFrameCount(1);
serverCapture.assertHasFrame(OpCode.CLOSE,1);
WebSocketFrame frame = serverCapture.getFrames().poll();
LinkedBlockingQueue<WebSocketFrame> serverCapture = serverConn.getFrameQueue();
WebSocketFrame frame = serverCapture.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
assertThat("Server close frame", frame, is(notNullValue()));
assertThat("Server received close frame",frame.getOpCode(),is(OpCode.CLOSE));
CloseInfo closeInfo = new CloseInfo(frame);
assertThat("Server received close code",closeInfo.getStatusCode(),is(expectedCloseCode));
@ -316,7 +310,7 @@ public class ClientCloseTest
public void testHalfClose() throws Exception
{
// Set client timeout
final int timeout = 1000;
final int timeout = 5000;
client.setMaxIdleTimeout(timeout);
// Client connects
@ -327,38 +321,42 @@ public class ClientCloseTest
IBlockheadServerConnection serverConn = server.accept();
serverConn.upgrade();
// client confirms connection via echo
confirmConnection(clientSocket,clientConnectFuture,serverConn);
try
{
// client confirms connection via echo
confirmConnection(clientSocket, clientConnectFuture, serverConn);
// client sends close frame (code 1000, normal)
final String origCloseReason = "Normal Close";
clientSocket.getSession().close(StatusCode.NORMAL,origCloseReason);
// client sends close frame (code 1000, normal)
final String origCloseReason = "Normal Close";
clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason);
// server receives close frame
confirmServerReceivedCloseFrame(serverConn,StatusCode.NORMAL,is(origCloseReason));
// server receives close frame
confirmServerReceivedCloseFrame(serverConn, StatusCode.NORMAL, is(origCloseReason));
// server sends 2 messages
serverConn.write(new TextFrame().setPayload("Hello"));
serverConn.write(new TextFrame().setPayload("World"));
// server sends 2 messages
serverConn.write(new TextFrame().setPayload("Hello"));
serverConn.write(new TextFrame().setPayload("World"));
// server sends close frame (code 1000, no reason)
CloseInfo sclose = new CloseInfo(StatusCode.NORMAL,"From Server");
serverConn.write(sclose.asFrame());
// server sends close frame (code 1000, no reason)
CloseInfo sclose = new CloseInfo(StatusCode.NORMAL, "From Server");
serverConn.write(sclose.asFrame());
// client receives 2 messages
clientSocket.messageQueue.awaitEventCount(2,1,TimeUnit.SECONDS);
// Verify received messages
String recvMsg = clientSocket.messageQueue.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
assertThat("Received message 1", recvMsg, is("Hello"));
recvMsg = clientSocket.messageQueue.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
assertThat("Received message 2", recvMsg, is("World"));
// Verify received messages
String recvMsg = clientSocket.messageQueue.poll();
assertThat("Received message 1",recvMsg,is("Hello"));
recvMsg = clientSocket.messageQueue.poll();
assertThat("Received message 2",recvMsg,is("World"));
// Verify that there are no errors
assertThat("Error events", clientSocket.error.get(), nullValue());
// Verify that there are no errors
assertThat("Error events",clientSocket.error.get(),nullValue());
// client close event on ws-endpoint
clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.NORMAL),containsString("From Server"));
// client close event on ws-endpoint
clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.NORMAL), containsString("From Server"));
}
finally
{
serverConn.disconnect();
}
}
@Ignore("Need sbordet's help here")
@ -377,35 +375,42 @@ public class ClientCloseTest
IBlockheadServerConnection serverConn = server.accept();
serverConn.upgrade();
// client confirms connection via echo
confirmConnection(clientSocket,clientConnectFuture,serverConn);
// client sends BIG frames (until it cannot write anymore)
// server must not read (for test purpose, in order to congest connection)
// when write is congested, client enqueue close frame
// client initiate write, but write never completes
EndPoint endp = clientSocket.getEndPoint();
assertThat("EndPoint is testable",endp,instanceOf(TestEndPoint.class));
TestEndPoint testendp = (TestEndPoint)endp;
char msg[] = new char[10240];
int writeCount = 0;
long writeSize = 0;
int i = 0;
while (!testendp.congestedFlush.get())
try
{
int z = i - ((i / 26) * 26);
char c = (char)('a' + z);
Arrays.fill(msg,c);
clientSocket.getRemote().sendStringByFuture(String.valueOf(msg));
writeCount++;
writeSize += msg.length;
}
LOG.info("Wrote {} frames totalling {} bytes of payload before congestion kicked in",writeCount,writeSize);
// client confirms connection via echo
confirmConnection(clientSocket, clientConnectFuture, serverConn);
// Verify timeout error
assertThat("OnError Latch", clientSocket.errorLatch.await(2, TimeUnit.SECONDS), is(true));
assertThat("OnError", clientSocket.error.get(), instanceOf(SocketTimeoutException.class));
// client sends BIG frames (until it cannot write anymore)
// server must not read (for test purpose, in order to congest connection)
// when write is congested, client enqueue close frame
// client initiate write, but write never completes
EndPoint endp = clientSocket.getEndPoint();
assertThat("EndPoint is testable", endp, instanceOf(TestEndPoint.class));
TestEndPoint testendp = (TestEndPoint) endp;
char msg[] = new char[10240];
int writeCount = 0;
long writeSize = 0;
int i = 0;
while (!testendp.congestedFlush.get())
{
int z = i - ((i / 26) * 26);
char c = (char) ('a' + z);
Arrays.fill(msg, c);
clientSocket.getRemote().sendStringByFuture(String.valueOf(msg));
writeCount++;
writeSize += msg.length;
}
LOG.info("Wrote {} frames totalling {} bytes of payload before congestion kicked in", writeCount, writeSize);
// Verify timeout error
assertThat("OnError Latch", clientSocket.errorLatch.await(2, TimeUnit.SECONDS), is(true));
assertThat("OnError", clientSocket.error.get(), instanceOf(SocketTimeoutException.class));
}
finally
{
serverConn.disconnect();
}
}
@Test
@ -423,36 +428,41 @@ public class ClientCloseTest
IBlockheadServerConnection serverConn = server.accept();
serverConn.upgrade();
// client confirms connection via echo
confirmConnection(clientSocket,clientConnectFuture,serverConn);
// client should not have received close message (yet)
clientSocket.assertNoCloseEvent();
// server sends bad close frame (too big of a reason message)
byte msg[] = new byte[400];
Arrays.fill(msg,(byte)'x');
ByteBuffer bad = ByteBuffer.allocate(500);
RawFrameBuilder.putOpFin(bad,OpCode.CLOSE,true);
RawFrameBuilder.putLength(bad,msg.length + 2,false);
bad.putShort((short)StatusCode.NORMAL);
bad.put(msg);
BufferUtil.flipToFlush(bad,0);
try (StacklessLogging quiet = new StacklessLogging(Parser.class))
try
{
serverConn.write(bad);
// client confirms connection via echo
confirmConnection(clientSocket, clientConnectFuture, serverConn);
// client should have noticed the error
assertThat("OnError Latch", clientSocket.errorLatch.await(2, TimeUnit.SECONDS), is(true));
assertThat("OnError", clientSocket.error.get(), instanceOf(ProtocolException.class));
assertThat("OnError", clientSocket.error.get().getMessage(), containsString("Invalid control frame"));
// client should not have received close message (yet)
clientSocket.assertNoCloseEvent();
// client parse invalid frame, notifies server of close (protocol error)
confirmServerReceivedCloseFrame(serverConn,StatusCode.PROTOCOL,allOf(containsString("Invalid control frame"),containsString("length")));
// server sends bad close frame (too big of a reason message)
byte msg[] = new byte[400];
Arrays.fill(msg, (byte) 'x');
ByteBuffer bad = ByteBuffer.allocate(500);
RawFrameBuilder.putOpFin(bad, OpCode.CLOSE, true);
RawFrameBuilder.putLength(bad, msg.length + 2, false);
bad.putShort((short) StatusCode.NORMAL);
bad.put(msg);
BufferUtil.flipToFlush(bad, 0);
try (StacklessLogging quiet = new StacklessLogging(Parser.class))
{
serverConn.write(bad);
// client should have noticed the error
assertThat("OnError Latch", clientSocket.errorLatch.await(2, TimeUnit.SECONDS), is(true));
assertThat("OnError", clientSocket.error.get(), instanceOf(ProtocolException.class));
assertThat("OnError", clientSocket.error.get().getMessage(), containsString("Invalid control frame"));
// client parse invalid frame, notifies server of close (protocol error)
confirmServerReceivedCloseFrame(serverConn, StatusCode.PROTOCOL, allOf(containsString("Invalid control frame"), containsString("length")));
}
}
finally
{
// server disconnects
serverConn.disconnect();
}
// server disconnects
serverConn.disconnect();
// client triggers close event on client ws-endpoint
clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.PROTOCOL),allOf(containsString("Invalid control frame"),containsString("length")));
@ -473,29 +483,36 @@ public class ClientCloseTest
IBlockheadServerConnection serverConn = server.accept();
serverConn.upgrade();
// client confirms connection via echo
confirmConnection(clientSocket,clientConnectFuture,serverConn);
try
{
// client confirms connection via echo
confirmConnection(clientSocket, clientConnectFuture, serverConn);
// client sends close frame
final String origCloseReason = "Normal Close";
clientSocket.getSession().close(StatusCode.NORMAL,origCloseReason);
// client sends close frame
final String origCloseReason = "Normal Close";
clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason);
// server receives close frame
confirmServerReceivedCloseFrame(serverConn,StatusCode.NORMAL,is(origCloseReason));
// server receives close frame
confirmServerReceivedCloseFrame(serverConn, StatusCode.NORMAL, is(origCloseReason));
// client should not have received close message (yet)
clientSocket.assertNoCloseEvent();
// client should not have received close message (yet)
clientSocket.assertNoCloseEvent();
// server shuts down connection (no frame reply)
serverConn.disconnect();
// server shuts down connection (no frame reply)
serverConn.disconnect();
// client reads -1 (EOF)
// client triggers close event on client ws-endpoint
clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.ABNORMAL),
anyOf(
containsString("EOF"),
containsString("Disconnected")
));
// client reads -1 (EOF)
// client triggers close event on client ws-endpoint
clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.ABNORMAL),
anyOf(
containsString("EOF"),
containsString("Disconnected")
));
}
finally
{
serverConn.disconnect();
}
}
@Test
@ -513,25 +530,32 @@ public class ClientCloseTest
IBlockheadServerConnection serverConn = server.accept();
serverConn.upgrade();
// client confirms connection via echo
confirmConnection(clientSocket,clientConnectFuture,serverConn);
try
{
// client confirms connection via echo
confirmConnection(clientSocket, clientConnectFuture, serverConn);
// client sends close frame
final String origCloseReason = "Normal Close";
clientSocket.getSession().close(StatusCode.NORMAL,origCloseReason);
// client sends close frame
final String origCloseReason = "Normal Close";
clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason);
// server receives close frame
confirmServerReceivedCloseFrame(serverConn,StatusCode.NORMAL,is(origCloseReason));
// server receives close frame
confirmServerReceivedCloseFrame(serverConn, StatusCode.NORMAL, is(origCloseReason));
// client should not have received close message (yet)
clientSocket.assertNoCloseEvent();
// client should not have received close message (yet)
clientSocket.assertNoCloseEvent();
// server never sends close frame handshake
// server sits idle
// server never sends close frame handshake
// server sits idle
// client idle timeout triggers close event on client ws-endpoint
assertThat("OnError Latch", clientSocket.errorLatch.await(2, TimeUnit.SECONDS), is(true));
assertThat("OnError", clientSocket.error.get(), instanceOf(TimeoutException.class));
// client idle timeout triggers close event on client ws-endpoint
assertThat("OnError Latch", clientSocket.errorLatch.await(2, TimeUnit.SECONDS), is(true));
assertThat("OnError", clientSocket.error.get(), instanceOf(TimeoutException.class));
}
finally
{
serverConn.disconnect();
}
}
@Test(timeout = 5000L)
@ -545,35 +569,45 @@ public class ClientCloseTest
CloseTrackingSocket clientSockets[] = new CloseTrackingSocket[clientCount];
IBlockheadServerConnection serverConns[] = new IBlockheadServerConnection[clientCount];
// Connect Multiple Clients
for (int i = 0; i < clientCount; i++)
try
{
// Client Request Upgrade
clientSockets[i] = new CloseTrackingSocket();
Future<Session> clientConnectFuture = client.connect(clientSockets[i],server.getWsUri());
// Connect Multiple Clients
for (int i = 0; i < clientCount; i++)
{
// Client Request Upgrade
clientSockets[i] = new CloseTrackingSocket();
Future<Session> clientConnectFuture = client.connect(clientSockets[i], server.getWsUri());
// Server accepts connection
serverConns[i] = server.accept();
serverConns[i].upgrade();
// Server accepts connection
serverConns[i] = server.accept();
serverConns[i].upgrade();
// client confirms connection via echo
confirmConnection(clientSockets[i],clientConnectFuture,serverConns[i]);
// client confirms connection via echo
confirmConnection(clientSockets[i], clientConnectFuture, serverConns[i]);
}
// client lifecycle stop
client.stop();
// clients send close frames (code 1001, shutdown)
for (int i = 0; i < clientCount; i++)
{
// server receives close frame
confirmServerReceivedCloseFrame(serverConns[i], StatusCode.SHUTDOWN, containsString("Shutdown"));
}
// clients disconnect
for (int i = 0; i < clientCount; i++)
{
clientSockets[i].assertReceivedCloseEvent(timeout, is(StatusCode.SHUTDOWN), containsString("Shutdown"));
}
}
// client lifecycle stop
client.stop();
// clients send close frames (code 1001, shutdown)
for (int i = 0; i < clientCount; i++)
finally
{
// server receives close frame
confirmServerReceivedCloseFrame(serverConns[i],StatusCode.SHUTDOWN,containsString("Shutdown"));
}
// clients disconnect
for (int i = 0; i < clientCount; i++)
{
clientSockets[i].assertReceivedCloseEvent(timeout,is(StatusCode.SHUTDOWN),containsString("Shutdown"));
for(IBlockheadServerConnection serverConn: serverConns)
{
serverConn.disconnect();
}
}
}
@ -592,24 +626,31 @@ public class ClientCloseTest
IBlockheadServerConnection serverConn = server.accept();
serverConn.upgrade();
// client confirms connection via echo
confirmConnection(clientSocket,clientConnectFuture,serverConn);
try
{
// client confirms connection via echo
confirmConnection(clientSocket, clientConnectFuture, serverConn);
// setup client endpoint for write failure (test only)
EndPoint endp = clientSocket.getEndPoint();
endp.shutdownOutput();
// setup client endpoint for write failure (test only)
EndPoint endp = clientSocket.getEndPoint();
endp.shutdownOutput();
// client enqueue close frame
// client write failure
final String origCloseReason = "Normal Close";
clientSocket.getSession().close(StatusCode.NORMAL,origCloseReason);
assertThat("OnError Latch", clientSocket.errorLatch.await(2, TimeUnit.SECONDS), is(true));
assertThat("OnError", clientSocket.error.get(), instanceOf(EofException.class));
// client enqueue close frame
// client write failure
final String origCloseReason = "Normal Close";
clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason);
// client triggers close event on client ws-endpoint
// assert - close code==1006 (abnormal)
// assert - close reason message contains (write failure)
clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.ABNORMAL),containsString("EOF"));
assertThat("OnError Latch", clientSocket.errorLatch.await(2, TimeUnit.SECONDS), is(true));
assertThat("OnError", clientSocket.error.get(), instanceOf(EofException.class));
// client triggers close event on client ws-endpoint
// assert - close code==1006 (abnormal)
// assert - close reason message contains (write failure)
clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.ABNORMAL), containsString("EOF"));
}
finally
{
serverConn.disconnect();
}
}
}

View File

@ -19,7 +19,6 @@
package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@ -29,10 +28,9 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.Session;
@ -42,6 +40,7 @@ import org.eclipse.jetty.websocket.api.util.QuoteUtil;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.test.BlockheadServer;
import org.eclipse.jetty.websocket.common.test.IBlockheadServerConnection;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -53,8 +52,8 @@ public class CookieTest
public static class CookieTrackingSocket extends WebSocketAdapter
{
public EventQueue<String> messageQueue = new EventQueue<>();
public EventQueue<Throwable> errorQueue = new EventQueue<>();
public LinkedBlockingQueue<String> messageQueue = new LinkedBlockingQueue<>();
public LinkedBlockingQueue<Throwable> errorQueue = new LinkedBlockingQueue<>();
private CountDownLatch openLatch = new CountDownLatch(1);
@Override
@ -189,18 +188,8 @@ public class CookieTest
clientConnectFuture.get(10,TimeUnit.SECONDS);
clientSocket.awaitOpen(2,TimeUnit.SECONDS);
try
{
// Wait for client receipt of cookie frame via client websocket
clientSocket.messageQueue.awaitEventCount(1, 3, TimeUnit.SECONDS);
}
catch (TimeoutException e)
{
e.printStackTrace(System.err);
assertThat("Message Count", clientSocket.messageQueue.size(), is(1));
}
String cookies = clientSocket.messageQueue.poll();
// Wait for client receipt of cookie frame via client websocket
String cookies = clientSocket.messageQueue.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
LOG.debug("Cookies seen at server: {}",cookies);
// Server closes connection

View File

@ -25,15 +25,14 @@ import static org.junit.Assert.assertThat;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.junit.Assert;
@ -52,8 +51,8 @@ public class JettyTrackingSocket extends WebSocketAdapter
public CountDownLatch openLatch = new CountDownLatch(1);
public CountDownLatch closeLatch = new CountDownLatch(1);
public CountDownLatch dataLatch = new CountDownLatch(1);
public EventQueue<String> messageQueue = new EventQueue<>();
public EventQueue<Throwable> errorQueue = new EventQueue<>();
public LinkedBlockingQueue<String> messageQueue = new LinkedBlockingQueue<>();
public LinkedBlockingQueue<Throwable> errorQueue = new LinkedBlockingQueue<>();
public void assertClose(int expectedStatusCode, String expectedReason) throws InterruptedException
{
@ -78,12 +77,6 @@ public class JettyTrackingSocket extends WebSocketAdapter
assertNotClosed();
}
public void assertMessage(String expected)
{
String actual = messageQueue.poll();
Assert.assertEquals("Message",expected,actual);
}
public void assertNotClosed()
{
LOG.debug("assertNotClosed() - {}", closeLatch.getCount());
@ -102,11 +95,6 @@ public class JettyTrackingSocket extends WebSocketAdapter
Assert.assertThat("Was Opened",openLatch.await(30,TimeUnit.SECONDS),is(true));
}
public void awaitMessage(int expectedMessageCount, TimeUnit timeoutUnit, int timeoutDuration) throws TimeoutException, InterruptedException
{
messageQueue.awaitEventCount(expectedMessageCount,timeoutDuration,timeoutUnit);
}
public void clear()
{
messageQueue.clear();

View File

@ -21,9 +21,9 @@ package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.is;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.Session;
@ -42,8 +42,8 @@ public class MaxMessageSocket
public CountDownLatch openLatch = new CountDownLatch(1);
public CountDownLatch closeLatch = new CountDownLatch(1);
public CountDownLatch dataLatch = new CountDownLatch(1);
public EventQueue<String> messageQueue = new EventQueue<>();
public EventQueue<Throwable> errorQueue = new EventQueue<>();
public LinkedBlockingQueue<String> messageQueue = new LinkedBlockingQueue<>();
public LinkedBlockingQueue<Throwable> errorQueue = new LinkedBlockingQueue<>();
public int closeCode = -1;
public StringBuilder closeMessage = new StringBuilder();

View File

@ -18,12 +18,12 @@
package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.is;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -35,6 +35,7 @@ import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.test.IBlockheadServerConnection;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.junit.Assert;
public class ServerReadThread extends Thread
@ -89,9 +90,9 @@ public class ServerReadThread extends Thread
conn.getParser().parse(buf);
}
Queue<WebSocketFrame> frames = conn.getIncomingFrames().getFrames();
LinkedBlockingQueue<WebSocketFrame> frames = conn.getFrameQueue();
WebSocketFrame frame;
while ((frame = frames.poll()) != null)
while ((frame = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT)) != null)
{
frameCount.incrementAndGet();
if (frame.getOpCode() == OpCode.CLOSE)

View File

@ -18,7 +18,10 @@
package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertThat;
import java.net.URI;
import java.util.Collection;
@ -32,6 +35,7 @@ import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.test.BlockheadServer;
import org.eclipse.jetty.websocket.common.test.IBlockheadServerConnection;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -74,18 +78,17 @@ public class SessionTest
final IBlockheadServerConnection srvSock = server.accept();
srvSock.upgrade();
Session sess = future.get(30000,TimeUnit.MILLISECONDS);
Assert.assertThat("Session",sess,notNullValue());
Assert.assertThat("Session.open",sess.isOpen(),is(true));
Assert.assertThat("Session.upgradeRequest",sess.getUpgradeRequest(),notNullValue());
Assert.assertThat("Session.upgradeResponse",sess.getUpgradeResponse(),notNullValue());
Session sess = future.get(30000, TimeUnit.MILLISECONDS);
Assert.assertThat("Session", sess, notNullValue());
Assert.assertThat("Session.open", sess.isOpen(), is(true));
Assert.assertThat("Session.upgradeRequest", sess.getUpgradeRequest(), notNullValue());
Assert.assertThat("Session.upgradeResponse", sess.getUpgradeResponse(), notNullValue());
cliSock.assertWasOpened();
cliSock.assertNotClosed();
Collection<WebSocketSession> sessions = client.getBeans(WebSocketSession.class);
Assert.assertThat("client.connectionManager.sessions.size",sessions.size(),is(1));
Assert.assertThat("client.connectionManager.sessions.size", sessions.size(), is(1));
RemoteEndpoint remote = cliSock.getSession().getRemote();
remote.sendStringByFuture("Hello World!");
@ -93,19 +96,31 @@ public class SessionTest
{
remote.flush();
}
srvSock.echoMessage(1,30000,TimeUnit.MILLISECONDS);
// wait for response from server
cliSock.waitForMessage(30000,TimeUnit.MILLISECONDS);
Set<WebSocketSession> open = client.getOpenSessions();
Assert.assertThat("(Before Close) Open Sessions.size", open.size(), is(1));
cliSock.assertMessage("Hello World!");
cliSock.close();
srvSock.close();
cliSock.waitForClose(30000,TimeUnit.MILLISECONDS);
open = client.getOpenSessions();
try
{
srvSock.enableIncomingEcho(true);
srvSock.startReadThread();
// wait for response from server
cliSock.waitForMessage(30000, TimeUnit.MILLISECONDS);
Set<WebSocketSession> open = client.getOpenSessions();
Assert.assertThat("(Before Close) Open Sessions.size", open.size(), is(1));
String received = cliSock.messageQueue.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
assertThat("Message", received, containsString("Hello World!"));
cliSock.close();
srvSock.close();
}
finally
{
srvSock.disconnect();
}
cliSock.waitForClose(30000, TimeUnit.MILLISECONDS);
Set<WebSocketSession> open = client.getOpenSessions();
// TODO this sometimes fails!
Assert.assertThat("(After Close) Open Sessions.size", open.size(), is(0));
}

View File

@ -18,10 +18,12 @@
package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;
import java.net.InetSocketAddress;
import java.net.URI;
@ -43,6 +45,7 @@ import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.io.FutureWriteCallback;
import org.eclipse.jetty.websocket.common.test.BlockheadServer;
import org.eclipse.jetty.websocket.common.test.IBlockheadServerConnection;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -103,26 +106,36 @@ public class WebSocketClientTest
srvSock.upgrade();
Session sess = future.get(30,TimeUnit.SECONDS);
Assert.assertThat("Session",sess,notNullValue());
Assert.assertThat("Session.open",sess.isOpen(),is(true));
Assert.assertThat("Session.upgradeRequest",sess.getUpgradeRequest(),notNullValue());
Assert.assertThat("Session.upgradeResponse",sess.getUpgradeResponse(),notNullValue());
assertThat("Session",sess,notNullValue());
assertThat("Session.open",sess.isOpen(),is(true));
assertThat("Session.upgradeRequest",sess.getUpgradeRequest(),notNullValue());
assertThat("Session.upgradeResponse",sess.getUpgradeResponse(),notNullValue());
cliSock.assertWasOpened();
cliSock.assertNotClosed();
Collection<WebSocketSession> sessions = client.getOpenSessions();
Assert.assertThat("client.connectionManager.sessions.size",sessions.size(),is(1));
assertThat("client.connectionManager.sessions.size",sessions.size(),is(1));
RemoteEndpoint remote = cliSock.getSession().getRemote();
remote.sendStringByFuture("Hello World!");
if (remote.getBatchMode() == BatchMode.ON)
remote.flush();
srvSock.echoMessage(1,30,TimeUnit.SECONDS);
// wait for response from server
cliSock.waitForMessage(30,TimeUnit.SECONDS);
cliSock.assertMessage("Hello World!");
try
{
srvSock.enableIncomingEcho(true);
srvSock.startReadThread();
// wait for response from server
String received = cliSock.messageQueue.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
assertThat("Message", received, containsString("Hello World"));
}
finally
{
srvSock.close();
srvSock.disconnect();
}
}
@Test
@ -140,16 +153,16 @@ public class WebSocketClientTest
srvSock.upgrade();
Session sess = future.get(30,TimeUnit.SECONDS);
Assert.assertThat("Session",sess,notNullValue());
Assert.assertThat("Session.open",sess.isOpen(),is(true));
Assert.assertThat("Session.upgradeRequest",sess.getUpgradeRequest(),notNullValue());
Assert.assertThat("Session.upgradeResponse",sess.getUpgradeResponse(),notNullValue());
assertThat("Session",sess,notNullValue());
assertThat("Session.open",sess.isOpen(),is(true));
assertThat("Session.upgradeRequest",sess.getUpgradeRequest(),notNullValue());
assertThat("Session.upgradeResponse",sess.getUpgradeResponse(),notNullValue());
cliSock.assertWasOpened();
cliSock.assertNotClosed();
Collection<WebSocketSession> sessions = client.getBeans(WebSocketSession.class);
Assert.assertThat("client.connectionManager.sessions.size",sessions.size(),is(1));
assertThat("client.connectionManager.sessions.size",sessions.size(),is(1));
FutureWriteCallback callback = new FutureWriteCallback();
@ -169,10 +182,10 @@ public class WebSocketClientTest
// Validate connect
Session sess = future.get(30,TimeUnit.SECONDS);
Assert.assertThat("Session",sess,notNullValue());
Assert.assertThat("Session.open",sess.isOpen(),is(true));
Assert.assertThat("Session.upgradeRequest",sess.getUpgradeRequest(),notNullValue());
Assert.assertThat("Session.upgradeResponse",sess.getUpgradeResponse(),notNullValue());
assertThat("Session",sess,notNullValue());
assertThat("Session.open",sess.isOpen(),is(true));
assertThat("Session.upgradeRequest",sess.getUpgradeRequest(),notNullValue());
assertThat("Session.upgradeResponse",sess.getUpgradeResponse(),notNullValue());
// Have server send initial message
srvSock.write(new TextFrame().setPayload("Hello World"));
@ -180,9 +193,9 @@ public class WebSocketClientTest
// Verify connect
future.get(30,TimeUnit.SECONDS);
wsocket.assertWasOpened();
wsocket.awaitMessage(1,TimeUnit.SECONDS,2);
wsocket.assertMessage("Hello World");
String received = wsocket.messageQueue.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
assertThat("Message", received, containsString("Hello World"));
}
@Test
@ -203,15 +216,15 @@ public class WebSocketClientTest
InetSocketAddress local = wsocket.getSession().getLocalAddress();
InetSocketAddress remote = wsocket.getSession().getRemoteAddress();
Assert.assertThat("Local Socket Address",local,notNullValue());
Assert.assertThat("Remote Socket Address",remote,notNullValue());
assertThat("Local Socket Address",local,notNullValue());
assertThat("Remote Socket Address",remote,notNullValue());
// Hard to validate (in a portable unit test) the local address that was used/bound in the low level Jetty Endpoint
Assert.assertThat("Local Socket Address / Host",local.getAddress().getHostAddress(),notNullValue());
Assert.assertThat("Local Socket Address / Port",local.getPort(),greaterThan(0));
assertThat("Local Socket Address / Host",local.getAddress().getHostAddress(),notNullValue());
assertThat("Local Socket Address / Port",local.getPort(),greaterThan(0));
Assert.assertThat("Remote Socket Address / Host",remote.getAddress().getHostAddress(),is(wsUri.getHost()));
Assert.assertThat("Remote Socket Address / Port",remote.getPort(),greaterThan(0));
assertThat("Remote Socket Address / Host",remote.getAddress().getHostAddress(),is(wsUri.getHost()));
assertThat("Remote Socket Address / Port",remote.getPort(),greaterThan(0));
}
@Test
@ -265,8 +278,8 @@ public class WebSocketClientTest
wsocket.awaitConnect(1,TimeUnit.SECONDS);
Session sess = future.get(30,TimeUnit.SECONDS);
Assert.assertThat("Session",sess,notNullValue());
Assert.assertThat("Session.open",sess.isOpen(),is(true));
assertThat("Session",sess,notNullValue());
assertThat("Session.open",sess.isOpen(),is(true));
// Create string that is larger than default size of 64k
// but smaller than maxMessageSize of 100k
@ -275,13 +288,23 @@ public class WebSocketClientTest
String msg = StringUtil.toUTF8String(buf,0,buf.length);
wsocket.getSession().getRemote().sendStringByFuture(msg);
ssocket.echoMessage(1,2,TimeUnit.SECONDS);
// wait for response from server
wsocket.waitForMessage(1,TimeUnit.SECONDS);
try
{
ssocket.enableIncomingEcho(true);
ssocket.startReadThread();
wsocket.assertMessage(msg);
// wait for response from server
wsocket.waitForMessage(1, TimeUnit.SECONDS);
Assert.assertTrue(wsocket.dataLatch.await(2,TimeUnit.SECONDS));
wsocket.assertMessage(msg);
Assert.assertTrue(wsocket.dataLatch.await(2, TimeUnit.SECONDS));
}
finally
{
ssocket.close();
ssocket.disconnect();
}
}
@Test
@ -301,15 +324,15 @@ public class WebSocketClientTest
Session session = wsocket.getSession();
UpgradeRequest req = session.getUpgradeRequest();
Assert.assertThat("Upgrade Request",req,notNullValue());
assertThat("Upgrade Request",req,notNullValue());
Map<String, List<String>> parameterMap = req.getParameterMap();
Assert.assertThat("Parameter Map",parameterMap,notNullValue());
assertThat("Parameter Map",parameterMap,notNullValue());
Assert.assertThat("Parameter[snack]",parameterMap.get("snack"),is(Arrays.asList(new String[] { "cashews" })));
Assert.assertThat("Parameter[amount]",parameterMap.get("amount"),is(Arrays.asList(new String[] { "handful" })));
Assert.assertThat("Parameter[brand]",parameterMap.get("brand"),is(Arrays.asList(new String[] { "off" })));
assertThat("Parameter[snack]",parameterMap.get("snack"),is(Arrays.asList(new String[] { "cashews" })));
assertThat("Parameter[amount]",parameterMap.get("amount"),is(Arrays.asList(new String[] { "handful" })));
assertThat("Parameter[brand]",parameterMap.get("brand"),is(Arrays.asList(new String[] { "off" })));
Assert.assertThat("Parameter[cost]",parameterMap.get("cost"),nullValue());
assertThat("Parameter[cost]",parameterMap.get("cost"),nullValue());
}
}

View File

@ -10,6 +10,7 @@ org.eclipse.jetty.LEVEL=WARN
# org.eclipse.jetty.websocket.client.ClientCloseTest.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.io.IOState.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.test.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.Generator.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.Parser.LEVEL=DEBUG

View File

@ -22,15 +22,19 @@ import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Pattern;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.junit.Assert;
/**
* @deprecated should refactor away.
*/
@SuppressWarnings("serial")
public class EventCapture extends EventQueue<String>
@Deprecated
public class EventCapture extends LinkedBlockingQueue<String>
{
private static final Logger LOG = Log.getLogger(EventCapture.class);
@ -74,6 +78,7 @@ public class EventCapture extends EventQueue<String>
public Assertable pop()
{
// TODO: poll should have timeout.
return new Assertable(super.poll());
}

View File

@ -23,9 +23,9 @@ import static org.hamcrest.Matchers.is;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -43,8 +43,8 @@ public class TrackingInputStreamSocket
public int closeCode = -1;
public StringBuilder closeMessage = new StringBuilder();
public CountDownLatch closeLatch = new CountDownLatch(1);
public EventQueue<String> messageQueue = new EventQueue<>();
public EventQueue<Throwable> errorQueue = new EventQueue<>();
public LinkedBlockingQueue<String> messageQueue = new LinkedBlockingQueue<>();
public LinkedBlockingQueue<Throwable> errorQueue = new LinkedBlockingQueue<>();
public TrackingInputStreamSocket()
{

View File

@ -22,10 +22,9 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.Session;
@ -45,8 +44,8 @@ public class TrackingSocket extends WebSocketAdapter
public CountDownLatch openLatch = new CountDownLatch(1);
public CountDownLatch closeLatch = new CountDownLatch(1);
public CountDownLatch dataLatch = new CountDownLatch(1);
public EventQueue<String> messageQueue = new EventQueue<>();
public EventQueue<Throwable> errorQueue = new EventQueue<>();
public LinkedBlockingQueue<String> messageQueue = new LinkedBlockingQueue<>();
public LinkedBlockingQueue<Throwable> errorQueue = new LinkedBlockingQueue<>();
public TrackingSocket()
{
@ -102,11 +101,6 @@ public class TrackingSocket extends WebSocketAdapter
Assert.assertThat("Was Opened",openLatch.await(30,TimeUnit.SECONDS),is(true));
}
public void awaitMessage(int expectedMessageCount, TimeUnit timeoutUnit, int timeoutDuration) throws TimeoutException, InterruptedException
{
messageQueue.awaitEventCount(expectedMessageCount,timeoutDuration,timeoutUnit);
}
public void clear()
{
messageQueue.clear();

View File

@ -41,13 +41,13 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.HttpsURLConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.StringUtil;
@ -96,8 +96,8 @@ public class BlockheadClient implements OutgoingFrames, ConnectionStateListener,
public long totalReadOps = 0;
public long totalParseOps = 0;
public EventQueue<WebSocketFrame> frames = new EventQueue<>();
public EventQueue<Throwable> errors = new EventQueue<>();
public LinkedBlockingQueue<WebSocketFrame> frames = new LinkedBlockingQueue<>();
public LinkedBlockingQueue<Throwable> errors = new LinkedBlockingQueue<>();
@Override
public void run()
@ -630,9 +630,8 @@ public class BlockheadClient implements OutgoingFrames, ConnectionStateListener,
}
@Override
public EventQueue<WebSocketFrame> readFrames(int expectedFrameCount, int timeoutDuration, TimeUnit timeoutUnit) throws Exception
public LinkedBlockingQueue<WebSocketFrame> getFrameQueue()
{
frameReader.frames.awaitEventCount(expectedFrameCount,timeoutDuration,timeoutUnit);
return frameReader.frames;
}

View File

@ -18,7 +18,8 @@
package org.eclipse.jetty.websocket.common.test;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import java.io.BufferedReader;
import java.io.IOException;
@ -32,8 +33,8 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
@ -50,9 +51,9 @@ import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.Frame.Type;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.api.extensions.Frame.Type;
import org.eclipse.jetty.websocket.common.AcceptHash;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.Generator;
@ -73,13 +74,15 @@ public class BlockheadServerConnection implements IncomingFrames, OutgoingFrames
private final Socket socket;
private final ByteBufferPool bufferPool;
private final WebSocketPolicy policy;
private final IncomingFramesCapture incomingFrames;
private final LinkedBlockingQueue<WebSocketFrame> incomingFrames = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<Throwable> incomingErrors = new LinkedBlockingQueue<>();
private final Parser parser;
private final Generator generator;
private final AtomicInteger parseCount;
private final WebSocketExtensionFactory extensionRegistry;
private final AtomicBoolean echoing = new AtomicBoolean(false);
private Thread echoThread;
private final AtomicBoolean reading = new AtomicBoolean(false);
private Thread readThread;
/** Set to true to disable timeouts (for debugging reasons) */
private boolean debug = false;
@ -92,7 +95,6 @@ public class BlockheadServerConnection implements IncomingFrames, OutgoingFrames
public BlockheadServerConnection(Socket socket)
{
this.socket = socket;
this.incomingFrames = new IncomingFramesCapture();
this.policy = WebSocketPolicy.newServerPolicy();
this.policy.setMaxBinaryMessageSize(100000);
this.policy.setMaxTextMessageSize(100000);
@ -140,6 +142,7 @@ public class BlockheadServerConnection implements IncomingFrames, OutgoingFrames
public void disconnect()
{
LOG.debug("disconnect");
reading.set(false);
IO.close(in);
IO.close(out);
if (socket != null)
@ -154,19 +157,6 @@ public class BlockheadServerConnection implements IncomingFrames, OutgoingFrames
}
}
}
@Override
public void echoMessage(int expectedFrames, int timeoutDuration, TimeUnit timeoutUnit) throws IOException, TimeoutException
{
LOG.debug("Echo Frames [expecting {}]",expectedFrames);
IncomingFramesCapture cap = readFrames(expectedFrames,timeoutDuration,timeoutUnit);
// now echo them back.
for (Frame frame : cap.getFrames())
{
write(WebSocketFrame.copy(frame).setMasked(false));
}
}
@Override
public void flush() throws IOException
{
@ -179,12 +169,6 @@ public class BlockheadServerConnection implements IncomingFrames, OutgoingFrames
return bufferPool;
}
@Override
public IncomingFramesCapture getIncomingFrames()
{
return incomingFrames;
}
public InputStream getInputStream() throws IOException
{
if (in == null)
@ -217,19 +201,19 @@ public class BlockheadServerConnection implements IncomingFrames, OutgoingFrames
@Override
public void incomingError(Throwable e)
{
incomingFrames.incomingError(e);
incomingErrors.offer(e);
}
@Override
public void incomingFrame(Frame frame)
{
LOG.debug("incoming({})",frame);
LOG.debug("incomingFrame({})",frame);
int count = parseCount.incrementAndGet();
if ((count % 10) == 0)
{
LOG.info("Server parsed {} frames",count);
}
incomingFrames.incomingFrame(WebSocketFrame.copy(frame));
incomingFrames.offer(WebSocketFrame.copy(frame));
if (frame.getOpCode() == OpCode.CLOSE)
{
@ -238,6 +222,7 @@ public class BlockheadServerConnection implements IncomingFrames, OutgoingFrames
}
Type type = frame.getType();
if (echoing.get() && (type.isData() || type.isContinuation()))
{
try
@ -329,52 +314,8 @@ public class BlockheadServerConnection implements IncomingFrames, OutgoingFrames
}
@Override
public IncomingFramesCapture readFrames(int expectedCount, int timeoutDuration, TimeUnit timeoutUnit) throws IOException, TimeoutException
public LinkedBlockingQueue<WebSocketFrame> getFrameQueue()
{
LOG.debug("Read: waiting for {} frame(s) from client",expectedCount);
int startCount = incomingFrames.size();
ByteBuffer buf = bufferPool.acquire(BUFFER_SIZE,false);
BufferUtil.clearToFill(buf);
try
{
long msDur = TimeUnit.MILLISECONDS.convert(timeoutDuration,timeoutUnit);
long now = System.currentTimeMillis();
long expireOn = now + msDur;
LOG.debug("Now: {} - expireOn: {} ({} ms)",now,expireOn,msDur);
int len = 0;
while (incomingFrames.size() < (startCount + expectedCount))
{
BufferUtil.clearToFill(buf);
len = read(buf);
if (len > 0)
{
LOG.debug("Read {} bytes",len);
BufferUtil.flipToFlush(buf,0);
parser.parse(buf);
}
try
{
TimeUnit.MILLISECONDS.sleep(20);
}
catch (InterruptedException gnore)
{
/* ignore */
}
if (!debug && (System.currentTimeMillis() > expireOn))
{
incomingFrames.dump();
throw new TimeoutException(String.format("Timeout reading all %d expected frames. (managed to only read %d frame(s))",expectedCount,
incomingFrames.size()));
}
}
}
finally
{
bufferPool.release(buf);
}
return incomingFrames;
}
@ -457,20 +398,19 @@ public class BlockheadServerConnection implements IncomingFrames, OutgoingFrames
{
LOG.debug("Entering echo thread");
ByteBuffer buf = bufferPool.acquire(BUFFER_SIZE,false);
BufferUtil.clearToFill(buf);
long readBytes = 0;
try
long totalReadBytes = 0;
ByteBuffer buf = bufferPool.acquire(BUFFER_SIZE, false);
while(reading.get())
{
while (echoing.get())
try
{
BufferUtil.clearToFill(buf);
long len = read(buf);
if (len > 0)
{
readBytes += len;
LOG.debug("Read {} bytes",len);
BufferUtil.flipToFlush(buf,0);
totalReadBytes += len;
LOG.debug("Read {} bytes", len);
BufferUtil.flipToFlush(buf, 0);
parser.parse(buf);
}
@ -483,16 +423,18 @@ public class BlockheadServerConnection implements IncomingFrames, OutgoingFrames
/* ignore */
}
}
catch (IOException e)
{
LOG.debug("Exception during echo loop", e);
}
catch (Throwable t)
{
LOG.warn("Exception during echo loop", t);
}
}
catch (IOException e)
{
LOG.debug("Exception during echo loop",e);
}
finally
{
LOG.debug("Read {} bytes",readBytes);
bufferPool.release(buf);
}
LOG.debug("Read {} total bytes (exiting)",totalReadBytes);
bufferPool.release(buf);
}
@Override
@ -502,21 +444,22 @@ public class BlockheadServerConnection implements IncomingFrames, OutgoingFrames
}
@Override
public void startEcho()
public void startReadThread()
{
if (echoThread != null)
if (readThread != null)
{
throw new IllegalStateException("Echo thread already declared!");
throw new IllegalStateException("Read thread already declared/started!");
}
echoThread = new Thread(this,"BlockheadServer/Echo");
echoing.set(true);
echoThread.start();
readThread = new Thread(this,"BlockheadServer/Read");
LOG.debug("Starting Read Thread: {}", readThread);
reading.set(true);
readThread.start();
}
@Override
public void stopEcho()
public void enableIncomingEcho(boolean enabled)
{
echoing.set(false);
echoing.set(enabled);
}
@Override

View File

@ -27,9 +27,9 @@ import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -146,19 +146,27 @@ public class Fuzzer implements AutoCloseable
expect(expect,10,TimeUnit.SECONDS);
}
/**
* Read the response frames and validate them against the expected frame list
*
* @param expect the list of expected frames
* @param duration the timeout duration to wait for each read frame
* @param unit the timeout unit to wait for each read frame
* @throws Exception if unable to validate expectations
*/
public void expect(List<WebSocketFrame> expect, int duration, TimeUnit unit) throws Exception
{
int expectedCount = expect.size();
LOG.debug("expect() {} frame(s)",expect.size());
// Read frames
EventQueue<WebSocketFrame> frames = client.readFrames(expect.size(),duration,unit);
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
String prefix = "";
for (int i = 0; i < expectedCount; i++)
{
WebSocketFrame expected = expect.get(i);
WebSocketFrame actual = frames.poll();
WebSocketFrame actual = frames.poll(duration,unit);
prefix = "Frame[" + i + "]";

View File

@ -21,9 +21,9 @@ package org.eclipse.jetty.websocket.common.test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
/**
@ -56,7 +56,7 @@ public interface IBlockheadClient extends AutoCloseable
public InetSocketAddress getRemoteSocketAddress();
public EventQueue<WebSocketFrame> readFrames(int expectedFrameCount, int timeoutDuration, TimeUnit timeoutUnit) throws Exception;
public LinkedBlockingQueue<WebSocketFrame> getFrameQueue();
public HttpResponse readResponseHeader() throws IOException;

View File

@ -22,47 +22,46 @@ import java.io.IOException;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.LinkedBlockingQueue;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.Parser;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
public interface IBlockheadServerConnection
{
public void close() throws IOException;
public void close(int statusCode) throws IOException;
public void write(Frame frame) throws IOException;
public List<String> upgrade() throws IOException;
public void disconnect();
public IncomingFramesCapture readFrames(int expectedCount, int timeoutDuration, TimeUnit timeoutUnit) throws IOException, TimeoutException;
public void write(Frame frame) throws IOException;
public void write(ByteBuffer buf) throws IOException;
public void write(int b) throws IOException;
public void flush() throws IOException;
public LinkedBlockingQueue<WebSocketFrame> getFrameQueue();
public void enableIncomingEcho(boolean enabled);
public void startReadThread();
public String readRequest() throws IOException;
public List<String> readRequestLines() throws IOException;
public String parseWebSocketKey(List<String> requestLines);
public void respond(String rawstr) throws IOException;
public String readRequest() throws IOException;
public List<String> regexFind(List<String> lines, String pattern);
public void echoMessage(int expectedFrames, int timeoutDuration, TimeUnit timeoutUnit) throws IOException, TimeoutException;
public void setSoTimeout(int ms) throws SocketException;
public ByteBufferPool getBufferPool();
public int read(ByteBuffer buf) throws IOException;
public Parser getParser();
public IncomingFramesCapture getIncomingFrames();
public void flush() throws IOException;
public void write(int b) throws IOException;
public void startEcho();
public void stopEcho();
/**
* Add an extra header for the upgrade response (from the server). No extra work is done to ensure the key and value are sane for http.
* @param rawkey the raw key
* @param rawvalue the raw value
*/
public void addResponseHeader(String rawkey, String rawvalue);
public List<String> upgrade() throws IOException;
public void setSoTimeout(int ms) throws SocketException;
public void respond(String rawstr) throws IOException;
public List<String> regexFind(List<String> lines, String pattern);
public ByteBufferPool getBufferPool();
public Parser getParser();
@Deprecated
public int read(ByteBuffer buf) throws IOException;
}

View File

@ -22,8 +22,8 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -37,8 +37,8 @@ import org.junit.Assert;
public class IncomingFramesCapture implements IncomingFrames
{
private static final Logger LOG = Log.getLogger(IncomingFramesCapture.class);
private EventQueue<WebSocketFrame> frames = new EventQueue<>();
private EventQueue<Throwable> errors = new EventQueue<>();
private LinkedBlockingQueue<WebSocketFrame> frames = new LinkedBlockingQueue<>();
private LinkedBlockingQueue<Throwable> errors = new LinkedBlockingQueue<>();
public void assertErrorCount(int expectedCount)
{

View File

@ -0,0 +1,27 @@
//
// ========================================================================
// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.test;
import java.util.concurrent.TimeUnit;
public class Timeouts
{
public static long POLL_EVENT = 2;
public static TimeUnit POLL_EVENT_UNIT = TimeUnit.SECONDS;
}

View File

@ -1,6 +1,7 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.LEVEL=WARN
# org.eclipse.jetty.websocket.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.test.LEVEL=DEBUG
# org.eclipse.jetty.websocket.protocol.Parser.LEVEL=DEBUG
# org.eclipse.jetty.websocket.protocol.LEVEL=DEBUG
# org.eclipse.jetty.websocket.io.payload.LEVEL=DEBUG

View File

@ -24,11 +24,10 @@ import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.eclipse.jetty.websocket.api.StatusCode;
@ -38,6 +37,7 @@ import org.eclipse.jetty.websocket.common.Parser;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.eclipse.jetty.websocket.server.examples.echo.BigEchoSocket;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.junit.AfterClass;
@ -105,8 +105,8 @@ public class AnnotatedMaxMessageSizeTest
client.write(new TextFrame().setPayload(msg));
// Read frame (hopefully text frame)
EventQueue<WebSocketFrame> frames = client.readFrames(1,30,TimeUnit.SECONDS);
WebSocketFrame tf = frames.poll();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame tf = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Text Frame.status code",tf.getPayloadAsUTF8(),is(msg));
}
finally
@ -133,8 +133,8 @@ public class AnnotatedMaxMessageSizeTest
client.write(new TextFrame().setPayload(ByteBuffer.wrap(buf)));
// Read frame (hopefully close frame saying its too large)
EventQueue<WebSocketFrame> frames = client.readFrames(1,30,TimeUnit.SECONDS);
WebSocketFrame tf = frames.poll();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame tf = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Frame is close", tf.getOpCode(), is(OpCode.CLOSE));
CloseInfo close = new CloseInfo(tf);
Assert.assertThat("Close Code", close.getStatusCode(), is(StatusCode.MESSAGE_TOO_LARGE));

View File

@ -21,13 +21,13 @@ package org.eclipse.jetty.websocket.server;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
import org.eclipse.jetty.websocket.common.test.HttpResponse;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.eclipse.jetty.websocket.server.examples.MyEchoServlet;
import org.junit.AfterClass;
import org.junit.Assert;
@ -73,8 +73,8 @@ public class ChromeTest
client.write(new TextFrame().setPayload(msg));
// Read frame (hopefully text frame)
EventQueue<WebSocketFrame> frames = client.readFrames(1,30,TimeUnit.SECONDS);
WebSocketFrame tf = frames.poll();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame tf = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Text Frame.status code",tf.getPayloadAsUTF8(),is(msg));
}
finally

View File

@ -24,18 +24,19 @@ import static org.junit.Assert.assertThat;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletContext;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.util.Decorator;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
@ -167,8 +168,8 @@ public class DecoratorsLegacyTest
client.write(new TextFrame().setPayload("info"));
EventQueue<WebSocketFrame> frames = client.readFrames(1,1,TimeUnit.SECONDS);
WebSocketFrame resp = frames.poll();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame resp = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
String textMsg = resp.getPayloadAsUTF8();
assertThat("DecoratedObjectFactory", textMsg, containsString("Object is a DecoratedObjectFactory"));

View File

@ -24,18 +24,19 @@ import static org.junit.Assert.assertThat;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletContext;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.util.Decorator;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
@ -165,8 +166,8 @@ public class DecoratorsTest
client.write(new TextFrame().setPayload("info"));
EventQueue<WebSocketFrame> frames = client.readFrames(1,1,TimeUnit.SECONDS);
WebSocketFrame resp = frames.poll();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame resp = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
String textMsg = resp.getPayloadAsUTF8();
assertThat("DecoratedObjectFactory", textMsg, containsString("Object is a DecoratedObjectFactory"));

View File

@ -18,11 +18,10 @@
package org.eclipse.jetty.websocket.server;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.is;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
@ -66,7 +65,7 @@ public class FirefoxTest
client.write(new TextFrame().setPayload(msg));
// Read frame (hopefully text frame)
EventQueue<WebSocketFrame> frames = client.readFrames(1, 30, TimeUnit.SECONDS);
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame tf = frames.poll();
Assert.assertThat("Text Frame.status code", tf.getPayloadAsUTF8(), is(msg));
}

View File

@ -21,13 +21,14 @@ package org.eclipse.jetty.websocket.server;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
import org.eclipse.jetty.websocket.common.test.HttpResponse;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.eclipse.jetty.websocket.server.helper.EchoServlet;
import org.junit.AfterClass;
import org.junit.Assert;
@ -89,10 +90,10 @@ public class FragmentExtensionTest
client.write(new TextFrame().setPayload(msg));
String parts[] = split(msg,fragSize);
EventQueue<WebSocketFrame> frames = client.readFrames(parts.length,1000,TimeUnit.MILLISECONDS);
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
for (int i = 0; i < parts.length; i++)
{
WebSocketFrame frame = frames.poll();
WebSocketFrame frame = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("text[" + i + "].payload",frame.getPayloadAsUTF8(),is(parts[i]));
}
}

View File

@ -21,13 +21,14 @@ package org.eclipse.jetty.websocket.server;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
import org.eclipse.jetty.websocket.common.test.HttpResponse;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.eclipse.jetty.websocket.server.helper.EchoServlet;
import org.junit.AfterClass;
import org.junit.Assert;
@ -72,8 +73,8 @@ public class IdentityExtensionTest
client.write(new TextFrame().setPayload("Hello"));
EventQueue<WebSocketFrame> frames = client.readFrames(1,1000,TimeUnit.MILLISECONDS);
WebSocketFrame frame = frames.poll();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame frame = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("TEXT.payload",frame.getPayloadAsUTF8(),is("Hello"));
}
finally

View File

@ -21,15 +21,16 @@ package org.eclipse.jetty.websocket.server;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.eclipse.jetty.websocket.server.helper.RFCSocket;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
@ -94,8 +95,8 @@ public class IdleTimeoutTest
client.write(new TextFrame().setPayload("Hello"));
// Expect server to have closed due to its own timeout
EventQueue<WebSocketFrame> frames = client.readFrames(1,30,TimeUnit.SECONDS);
WebSocketFrame frame = frames.poll();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame frame = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("frame opcode",frame.getOpCode(),is(OpCode.CLOSE));
CloseInfo close = new CloseInfo(frame);
Assert.assertThat("close code",close.getStatusCode(),is(StatusCode.SHUTDOWN));

View File

@ -25,10 +25,10 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.log.StacklessLogging;
@ -43,6 +43,7 @@ import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
import org.eclipse.jetty.websocket.common.test.IBlockheadClient;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.eclipse.jetty.websocket.server.helper.RFCSocket;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
@ -272,21 +273,21 @@ public class ManyConnectionsCleanupTest
client.write(new TextFrame().setPayload("calls"));
client.write(new TextFrame().setPayload("openSessions"));
EventQueue<WebSocketFrame> frames = client.readFrames(3,6,TimeUnit.SECONDS);
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame frame;
String resp;
frame = frames.poll();
frame = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
assertThat("frames[0].opcode",frame.getOpCode(),is(OpCode.TEXT));
resp = frame.getPayloadAsUTF8();
assertThat("Should only have 1 open session",resp,containsString("calls=" + ((iterationCount * 2) + 1)));
frame = frames.poll();
frame = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
assertThat("frames[1].opcode",frame.getOpCode(),is(OpCode.TEXT));
resp = frame.getPayloadAsUTF8();
assertThat("Should only have 1 open session",resp,containsString("openSessions.size=1\n"));
frame = frames.poll();
frame = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
assertThat("frames[2].opcode",frame.getOpCode(),is(OpCode.CLOSE));
CloseInfo close = new CloseInfo(frame);
assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.NORMAL));
@ -310,8 +311,9 @@ public class ManyConnectionsCleanupTest
client.connect();
client.sendStandardRequest();
client.expectUpgradeResponse();
client.readFrames(1,1,TimeUnit.SECONDS);
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
CloseInfo close = new CloseInfo(StatusCode.NORMAL,"Normal");
assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.NORMAL));

View File

@ -34,6 +34,7 @@ import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.eclipse.jetty.websocket.common.util.Sha1Sum;
import org.eclipse.jetty.websocket.server.helper.CaptureSocket;
import org.eclipse.jetty.websocket.server.helper.EchoServlet;
@ -166,8 +167,7 @@ public class PerMessageDeflateExtensionTest
// Client sends first message
session.getRemote().sendBytes(ByteBuffer.wrap(msg));
clientSocket.messages.awaitEventCount(1,5,TimeUnit.SECONDS);
String echoMsg = clientSocket.messages.poll();
String echoMsg = clientSocket.messages.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Echo'd Message",echoMsg,is("binary[sha1="+sha1+"]"));
}
finally

View File

@ -21,9 +21,9 @@ package org.eclipse.jetty.websocket.server;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
@ -31,6 +31,7 @@ import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
@ -129,8 +130,8 @@ public class SubProtocolTest
client.expectUpgradeResponse();
client.write(new TextFrame().setPayload("showme"));
EventQueue<WebSocketFrame> frames = client.readFrames(1, 30, TimeUnit.SECONDS);
WebSocketFrame tf = frames.poll();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame tf = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
assertThat(ProtocolEchoSocket.class.getSimpleName() + ".onMessage()", tf.getPayloadAsUTF8(), is("acceptedSubprotocol=" + acceptedSubProtocols));
}

View File

@ -21,9 +21,9 @@ package org.eclipse.jetty.websocket.server;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.WriteCallback;
@ -33,6 +33,7 @@ import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
@ -128,10 +129,10 @@ public class SuspendResumeTest
client.write(new TextFrame().setPayload("echo1"));
client.write(new TextFrame().setPayload("echo2"));
EventQueue<WebSocketFrame> frames = client.readFrames(2, 30, TimeUnit.SECONDS);
WebSocketFrame tf = frames.poll();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame tf = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
assertThat(EchoSocket.class.getSimpleName() + ".onMessage()", tf.getPayloadAsUTF8(), is("echo1"));
tf = frames.poll();
tf = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
assertThat(EchoSocket.class.getSimpleName() + ".onMessage()", tf.getPayloadAsUTF8(), is("echo2"));
}
}

View File

@ -23,17 +23,17 @@ import static org.hamcrest.Matchers.is;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;
import org.eclipse.jetty.io.LeakTrackingByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.eclipse.jetty.websocket.server.examples.MyEchoServlet;
import org.junit.AfterClass;
import org.junit.Assert;
@ -108,10 +108,10 @@ public class TooFastClientTest
client.expectUpgradeResponse();
// Read frames (hopefully text frames)
EventQueue<WebSocketFrame> frames = client.readFrames(2,1,TimeUnit.SECONDS);
WebSocketFrame tf = frames.poll();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame tf = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Text Frame/msg1",tf.getPayloadAsUTF8(),is(msg1));
tf = frames.poll();
tf = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Text Frame/msg2",tf.getPayloadAsUTF8(),is(msg2));
}
finally
@ -167,9 +167,9 @@ public class TooFastClientTest
client.expectUpgradeResponse();
// Read frames (hopefully text frames)
EventQueue<WebSocketFrame> frames = client.readFrames(1,1,TimeUnit.SECONDS);
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame tf = frames.poll();
WebSocketFrame tf = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Text Frame/msg1",tf.getPayloadAsUTF8(),is(bigMsg));
}
finally

View File

@ -26,9 +26,9 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.log.StacklessLogging;
@ -42,6 +42,7 @@ import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
import org.eclipse.jetty.websocket.common.test.IBlockheadClient;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.eclipse.jetty.websocket.server.helper.RFCSocket;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
@ -231,8 +232,8 @@ public class WebSocketCloseTest
client.expectUpgradeResponse();
// Verify that client got close frame
EventQueue<WebSocketFrame> frames = client.readFrames(1,5,TimeUnit.SECONDS);
WebSocketFrame frame = frames.poll();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame frame = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
assertThat("frames[0].opcode",frame.getOpCode(),is(OpCode.CLOSE));
CloseInfo close = new CloseInfo(frame);
assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.NORMAL));
@ -265,8 +266,8 @@ public class WebSocketCloseTest
client.sendStandardRequest();
client.expectUpgradeResponse();
EventQueue<WebSocketFrame> frames = client.readFrames(1,5,TimeUnit.SECONDS);
WebSocketFrame frame = frames.poll();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame frame = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
assertThat("frames[0].opcode",frame.getOpCode(),is(OpCode.CLOSE));
CloseInfo close = new CloseInfo(frame);
assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.SERVER_ERROR));
@ -306,14 +307,14 @@ public class WebSocketCloseTest
text.setPayload("openSessions");
client.write(text);
EventQueue<WebSocketFrame> frames = client.readFrames(2,1,TimeUnit.SECONDS);
WebSocketFrame frame = frames.poll();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame frame = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
assertThat("frames[0].opcode",frame.getOpCode(),is(OpCode.TEXT));
String resp = frame.getPayloadAsUTF8();
assertThat("Should only have 1 open session",resp,containsString("openSessions.size=1\n"));
frame = frames.poll();
frame = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
assertThat("frames[1].opcode",frame.getOpCode(),is(OpCode.CLOSE));
CloseInfo close = new CloseInfo(frame);
assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.NORMAL));
@ -338,8 +339,9 @@ public class WebSocketCloseTest
client.connect();
client.sendStandardRequest();
client.expectUpgradeResponse();
client.readFrames(1,1,TimeUnit.SECONDS);
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame received = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
CloseInfo close = new CloseInfo(StatusCode.NORMAL,"Normal");
assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.NORMAL));
@ -365,8 +367,9 @@ public class WebSocketCloseTest
client.connect();
client.sendStandardRequest();
client.expectUpgradeResponse();
client.readFrames(1,1,TimeUnit.SECONDS);
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame received = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
CloseInfo close = new CloseInfo(StatusCode.NORMAL,"Normal");
client.write(close.asFrame()); // respond with close

View File

@ -22,16 +22,17 @@ import static org.hamcrest.Matchers.is;
import java.net.URI;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.eclipse.jetty.websocket.server.helper.CaptureSocket;
import org.eclipse.jetty.websocket.server.helper.SessionServlet;
import org.junit.AfterClass;
@ -94,9 +95,8 @@ public class WebSocketOverSSLTest
remote.flush();
// Read frame (hopefully text frame)
clientSocket.messages.awaitEventCount(1,30,TimeUnit.SECONDS);
EventQueue<String> captured = clientSocket.messages;
Assert.assertThat("Text Message",captured.poll(),is(msg));
LinkedBlockingQueue<String> captured = clientSocket.messages;
Assert.assertThat("Text Message",captured.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT),is(msg));
// Shutdown the socket
clientSocket.close();
@ -136,9 +136,8 @@ public class WebSocketOverSSLTest
remote.flush();
// Read frame (hopefully text frame)
clientSocket.messages.awaitEventCount(1,30,TimeUnit.SECONDS);
EventQueue<String> captured = clientSocket.messages;
Assert.assertThat("Server.session.isSecure",captured.poll(),is("session.isSecure=true"));
LinkedBlockingQueue<String> captured = clientSocket.messages;
Assert.assertThat("Server.session.isSecure",captured.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT),is("session.isSecure=true"));
// Shutdown the socket
clientSocket.close();
@ -178,10 +177,9 @@ public class WebSocketOverSSLTest
remote.flush();
// Read frame (hopefully text frame)
clientSocket.messages.awaitEventCount(1,30,TimeUnit.SECONDS);
EventQueue<String> captured = clientSocket.messages;
LinkedBlockingQueue<String> captured = clientSocket.messages;
String expected = String.format("session.upgradeRequest.requestURI=%s",requestUri.toASCIIString());
Assert.assertThat("session.upgradeRequest.requestURI",captured.poll(),is(expected));
Assert.assertThat("session.upgradeRequest.requestURI",captured.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT),is(expected));
// Shutdown the socket
clientSocket.close();

View File

@ -21,14 +21,15 @@ package org.eclipse.jetty.websocket.server;
import static org.hamcrest.Matchers.is;
import java.net.URI;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
import org.eclipse.jetty.websocket.common.test.IBlockheadClient;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.eclipse.jetty.websocket.server.helper.SessionServlet;
import org.junit.AfterClass;
import org.junit.Assert;
@ -90,14 +91,14 @@ public class WebSocketServerSessionTest
client.write(new TextFrame().setPayload("getParameterMap|cost")); // intentionally invalid
// Read frame (hopefully text frame)
EventQueue<WebSocketFrame> frames = client.readFrames(4,5,TimeUnit.SECONDS);
WebSocketFrame tf = frames.poll();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame tf = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Parameter Map[snack]", tf.getPayloadAsUTF8(), is("[cashews]"));
tf = frames.poll();
tf = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Parameter Map[amount]", tf.getPayloadAsUTF8(), is("[handful]"));
tf = frames.poll();
tf = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Parameter Map[brand]", tf.getPayloadAsUTF8(), is("[off]"));
tf = frames.poll();
tf = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Parameter Map[cost]", tf.getPayloadAsUTF8(), is("<null>"));
}
}

View File

@ -22,10 +22,9 @@ import static org.hamcrest.Matchers.is;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.util.Utf8Appendable.NotUtf8Exception;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.StacklessLogging;
@ -42,6 +41,7 @@ import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
import org.eclipse.jetty.websocket.common.frames.ContinuationFrame;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.eclipse.jetty.websocket.common.test.UnitGenerator;
import org.eclipse.jetty.websocket.common.util.Hex;
import org.eclipse.jetty.websocket.server.helper.RFCServlet;
@ -124,8 +124,8 @@ public class WebSocketServletRFCTest
client.write(bin); // write buf3 (fin=true)
// Read frame echo'd back (hopefully a single binary frame)
EventQueue<WebSocketFrame> frames = client.readFrames(1,30,TimeUnit.SECONDS);
Frame binmsg = frames.poll();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
Frame binmsg = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
int expectedSize = buf1.length + buf2.length + buf3.length;
Assert.assertThat("BinaryFrame.payloadLength",binmsg.getPayloadLength(),is(expectedSize));
@ -191,8 +191,8 @@ public class WebSocketServletRFCTest
client.write(new TextFrame().setPayload(msg));
// Read frame (hopefully text frame)
EventQueue<WebSocketFrame> frames = client.readFrames(1,30,TimeUnit.SECONDS);
WebSocketFrame tf = frames.poll();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame tf = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Text Frame.status code",tf.getPayloadAsUTF8(),is(msg));
}
finally
@ -222,8 +222,8 @@ public class WebSocketServletRFCTest
client.write(new TextFrame().setPayload("CRASH"));
// Read frame (hopefully close frame)
EventQueue<WebSocketFrame> frames = client.readFrames(1,30,TimeUnit.SECONDS);
Frame cf = frames.poll();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
Frame cf = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
CloseInfo close = new CloseInfo(cf);
Assert.assertThat("Close Frame.status code",close.getStatusCode(),is(StatusCode.SERVER_ERROR));
}
@ -263,8 +263,8 @@ public class WebSocketServletRFCTest
client.write(new TextFrame().setPayload(msg));
// Read frame (hopefully text frame)
EventQueue<WebSocketFrame> frames = client.readFrames(1,30,TimeUnit.SECONDS);
WebSocketFrame tf = frames.poll();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame tf = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Text Frame.status code",tf.getPayloadAsUTF8(),is(msg));
}
finally
@ -293,8 +293,8 @@ public class WebSocketServletRFCTest
client.writeRaw(bbHeader);
client.writeRaw(txt.getPayload());
EventQueue<WebSocketFrame> frames = client.readFrames(1,30,TimeUnit.SECONDS);
WebSocketFrame frame = frames.poll();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame frame = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("frames[0].opcode",frame.getOpCode(),is(OpCode.CLOSE));
CloseInfo close = new CloseInfo(frame);
Assert.assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.BAD_PAYLOAD));
@ -334,8 +334,8 @@ public class WebSocketServletRFCTest
client.write(new TextFrame().setPayload(msg));
// Read frame (hopefully text frame)
EventQueue<WebSocketFrame> frames = client.readFrames(1,30,TimeUnit.SECONDS);
WebSocketFrame tf = frames.poll();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame tf = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Text Frame.status code",tf.getPayloadAsUTF8(),is(msg));
}
finally

View File

@ -27,7 +27,7 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.DispatcherType;
@ -36,12 +36,12 @@ import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.webapp.WebAppContext;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
import org.junit.Ignore;
import org.junit.Test;
@ -314,8 +314,9 @@ public class WebSocketUpgradeFilterTest
client.write(new TextFrame().setPayload("hello"));
EventQueue<WebSocketFrame> frames = client.readFrames(1, 1000, TimeUnit.MILLISECONDS);
String payload = frames.poll().getPayloadAsUTF8();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame received = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
String payload = received.getPayloadAsUTF8();
// If we can connect and send a text message, we know that the endpoint was
// added properly, and the response will help us verify the policy configuration too
@ -336,9 +337,11 @@ public class WebSocketUpgradeFilterTest
client.write(new TextFrame().setPayload("hello 1"));
EventQueue<WebSocketFrame> frames = client.readFrames(1, 1000, TimeUnit.MILLISECONDS);
String payload = frames.poll().getPayloadAsUTF8();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame received = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
String payload = received.getPayloadAsUTF8();
// If we can connect and send a text message, we know that the endpoint was
// added properly, and the response will help us verify the policy configuration too
assertThat("payload", payload, containsString("session.maxTextMessageSize=" + (10 * 1024 * 1024)));
@ -355,9 +358,11 @@ public class WebSocketUpgradeFilterTest
client.write(new TextFrame().setPayload("hello 2"));
EventQueue<WebSocketFrame> frames = client.readFrames(1, 1000, TimeUnit.MILLISECONDS);
String payload = frames.poll().getPayloadAsUTF8();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame received = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
String payload = received.getPayloadAsUTF8();
// If we can connect and send a text message, we know that the endpoint was
// added properly, and the response will help us verify the policy configuration too
assertThat("payload", payload, containsString("session.maxTextMessageSize=" + (10 * 1024 * 1024)));

View File

@ -20,9 +20,9 @@ package org.eclipse.jetty.websocket.server.helper;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.common.util.Sha1Sum;
@ -30,12 +30,7 @@ import org.eclipse.jetty.websocket.common.util.Sha1Sum;
public class CaptureSocket extends WebSocketAdapter
{
private final CountDownLatch latch = new CountDownLatch(1);
public EventQueue<String> messages;
public CaptureSocket()
{
messages = new EventQueue<String>();
}
public LinkedBlockingQueue<String> messages = new LinkedBlockingQueue<>();
public boolean awaitConnected(long timeout) throws InterruptedException
{

View File

@ -21,9 +21,10 @@ package org.eclipse.jetty.websocket.server.misbehaving;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.common.CloseInfo;
@ -32,15 +33,12 @@ import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
import org.eclipse.jetty.websocket.common.test.IBlockheadClient;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.eclipse.jetty.websocket.server.SimpleServletServer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
/**
* Testing badly behaving Socket class implementations to get the best
* error messages and state out of the websocket implementation.
@ -80,8 +78,8 @@ public class MisbehavingClassTest
client.sendStandardRequest();
client.expectUpgradeResponse();
EventQueue<WebSocketFrame> frames = client.readFrames(1,1,TimeUnit.SECONDS);
WebSocketFrame frame = frames.poll();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame frame = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
assertThat("frames[0].opcode",frame.getOpCode(),is(OpCode.CLOSE));
CloseInfo close = new CloseInfo(frame);
assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.SERVER_ERROR));
@ -115,8 +113,8 @@ public class MisbehavingClassTest
client.sendStandardRequest();
client.expectUpgradeResponse();
EventQueue<WebSocketFrame> frames = client.readFrames(1,1,TimeUnit.SECONDS);
WebSocketFrame frame = frames.poll();
LinkedBlockingQueue<WebSocketFrame> frames = client.getFrameQueue();
WebSocketFrame frame = frames.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
assertThat("frames[0].opcode",frame.getOpCode(),is(OpCode.CLOSE));
CloseInfo close = new CloseInfo(frame);
assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.SERVER_ERROR));