433262 - WebSocket / Advanced close use cases
+ Improved test case handling of frame reading (to prevent future false failures) + Improved AbstractWebSocketConnection handling of close -> disconnect to only trigger on successful write (or failure) of close frame flush attempt. + Flusher close on all forms of disconnect, even half-closed.
This commit is contained in:
parent
bdecc7bd89
commit
1fb578165f
|
@ -37,6 +37,7 @@ import javax.websocket.server.ServerEndpointConfig;
|
|||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.ServerConnector;
|
||||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||
import org.eclipse.jetty.toolchain.test.EventQueue;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.util.QuoteUtil;
|
||||
|
@ -44,7 +45,6 @@ import org.eclipse.jetty.websocket.common.WebSocketFrame;
|
|||
import org.eclipse.jetty.websocket.common.frames.TextFrame;
|
||||
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
|
||||
import org.eclipse.jetty.websocket.common.test.HttpResponse;
|
||||
import org.eclipse.jetty.websocket.common.test.IncomingFramesCapture;
|
||||
import org.eclipse.jetty.websocket.jsr356.server.deploy.WebSocketServerContainerInitializer;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
|
@ -210,8 +210,8 @@ public class ConfiguratorTest
|
|||
client.expectUpgradeResponse();
|
||||
|
||||
client.write(new TextFrame().setPayload("X-Dummy"));
|
||||
IncomingFramesCapture capture = client.readFrames(1,TimeUnit.SECONDS,1);
|
||||
WebSocketFrame frame = capture.getFrames().poll();
|
||||
EventQueue<WebSocketFrame> frames = client.readFrames(1,1,TimeUnit.SECONDS);
|
||||
WebSocketFrame frame = frames.poll();
|
||||
Assert.assertThat("Frame Response", frame.getPayloadAsUTF8(), is("Request Header [X-Dummy]: \"Bogus\""));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -425,7 +425,7 @@ public class ClientCloseTest
|
|||
|
||||
// client idle timeout triggers close event on client ws-endpoint
|
||||
// client close event on ws-endpoint
|
||||
clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.ABNORMAL),containsString("Timeout"));
|
||||
clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.SHUTDOWN),containsString("Timeout"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -541,9 +541,7 @@ public class ClientCloseTest
|
|||
// server sits idle
|
||||
|
||||
// client idle timeout triggers close event on client ws-endpoint
|
||||
// assert - close code==1006 (abnormal)
|
||||
// assert - close reason message contains (timeout)
|
||||
clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.ABNORMAL),containsString("Timeout"));
|
||||
clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.SHUTDOWN),containsString("Timeout"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -370,10 +370,13 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames
|
|||
protected Action process() throws Exception
|
||||
{
|
||||
current = entries.poll();
|
||||
LOG.debug("Processing {}", current);
|
||||
if (current == null)
|
||||
{
|
||||
LOG.debug("Entering IDLE");
|
||||
return Action.IDLE;
|
||||
nextOutgoing.outgoingFrame(current.frame, this, current.batchMode);
|
||||
}
|
||||
LOG.debug("Processing {}",current);
|
||||
nextOutgoing.outgoingFrame(current.frame,this,current.batchMode);
|
||||
return Action.SCHEDULED;
|
||||
}
|
||||
|
||||
|
|
|
@ -53,7 +53,6 @@ import org.eclipse.jetty.websocket.common.CloseInfo;
|
|||
import org.eclipse.jetty.websocket.common.ConnectionState;
|
||||
import org.eclipse.jetty.websocket.common.Generator;
|
||||
import org.eclipse.jetty.websocket.common.LogicalConnection;
|
||||
import org.eclipse.jetty.websocket.common.OpCode;
|
||||
import org.eclipse.jetty.websocket.common.Parser;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketSession;
|
||||
import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
|
||||
|
@ -133,6 +132,68 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
}
|
||||
}
|
||||
|
||||
public class OnCloseLocalCallback implements WriteCallback
|
||||
{
|
||||
private final WriteCallback callback;
|
||||
private final CloseInfo close;
|
||||
|
||||
public OnCloseLocalCallback(WriteCallback callback, CloseInfo close)
|
||||
{
|
||||
this.callback = callback;
|
||||
this.close = close;
|
||||
}
|
||||
|
||||
public OnCloseLocalCallback(CloseInfo close)
|
||||
{
|
||||
this(null,close);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeFailed(Throwable x)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (callback != null)
|
||||
{
|
||||
callback.writeFailed(x);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
onLocalClose();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeSuccess()
|
||||
{
|
||||
try
|
||||
{
|
||||
if (callback != null)
|
||||
{
|
||||
callback.writeSuccess();
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
onLocalClose();
|
||||
}
|
||||
}
|
||||
|
||||
private void onLocalClose()
|
||||
{
|
||||
LOG.debug("Local Close Confirmed {}",close);
|
||||
if (close.isAbnormal())
|
||||
{
|
||||
ioState.onAbnormalClose(close);
|
||||
}
|
||||
else
|
||||
{
|
||||
ioState.onCloseLocal(close);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class Stats
|
||||
{
|
||||
private AtomicLong countFillInterestedEvents = new AtomicLong(0);
|
||||
|
@ -220,26 +281,20 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
{
|
||||
LOG.debug("close({},{})",statusCode,reason);
|
||||
CloseInfo close = new CloseInfo(statusCode,reason);
|
||||
if (statusCode == StatusCode.ABNORMAL)
|
||||
{
|
||||
ioState.onAbnormalClose(close);
|
||||
}
|
||||
else
|
||||
{
|
||||
ioState.onCloseLocal(close);
|
||||
}
|
||||
this.outgoingFrame(close.asFrame(),new OnCloseLocalCallback(close),BatchMode.OFF);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void disconnect()
|
||||
{
|
||||
LOG.debug("{} disconnect()",policy.getBehavior());
|
||||
flusher.close();
|
||||
disconnect(false);
|
||||
}
|
||||
|
||||
private void disconnect(boolean onlyOutput)
|
||||
{
|
||||
LOG.debug("{} disconnect({})",policy.getBehavior(),onlyOutput?"outputOnly":"both");
|
||||
// close FrameFlusher, we cannot write anymore at this point.
|
||||
flusher.close();
|
||||
EndPoint endPoint = getEndPoint();
|
||||
// We need to gently close first, to allow
|
||||
// SSL close alerts to be sent by Jetty
|
||||
|
@ -388,13 +443,20 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
CloseInfo abnormal = new CloseInfo(StatusCode.SHUTDOWN,"Abnormal Close - " + ioState.getCloseInfo().getReason());
|
||||
outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback(false),BatchMode.OFF);
|
||||
}
|
||||
// Just disconnect
|
||||
this.disconnect(false);
|
||||
else
|
||||
{
|
||||
// Just disconnect
|
||||
this.disconnect(false);
|
||||
}
|
||||
break;
|
||||
case CLOSING:
|
||||
CloseInfo close = ioState.getCloseInfo();
|
||||
// reply to close handshake from remote
|
||||
outgoingFrame(close.asFrame(),new OnDisconnectCallback(true),BatchMode.OFF);
|
||||
// First occurrence of .onCloseLocal or .onCloseRemote use
|
||||
if (ioState.wasRemoteCloseInitiated())
|
||||
{
|
||||
CloseInfo close = ioState.getCloseInfo();
|
||||
// reply to close handshake from remote
|
||||
outgoingFrame(close.asFrame(),new OnCloseLocalCallback(new OnDisconnectCallback(true),close),BatchMode.OFF);
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -463,7 +525,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
finally
|
||||
{
|
||||
// This is an Abnormal Close condition
|
||||
close(StatusCode.ABNORMAL,"Idle Timeout");
|
||||
close(StatusCode.SHUTDOWN,"Idle Timeout");
|
||||
}
|
||||
|
||||
return false;
|
||||
|
@ -480,21 +542,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
LOG.debug("outgoingFrame({}, {})",frame,callback);
|
||||
}
|
||||
|
||||
CloseInfo close = null;
|
||||
// grab a copy of the frame details before masking and whatnot
|
||||
if (frame.getOpCode() == OpCode.CLOSE)
|
||||
{
|
||||
close = new CloseInfo(frame);
|
||||
}
|
||||
|
||||
flusher.enqueue(frame,callback,batchMode);
|
||||
|
||||
// now trigger local close
|
||||
if (close != null)
|
||||
{
|
||||
LOG.debug("outgoing CLOSE frame - {}: {}",frame,close);
|
||||
ioState.onCloseLocal(close);
|
||||
}
|
||||
}
|
||||
|
||||
private int read(ByteBuffer buffer)
|
||||
|
|
|
@ -36,12 +36,13 @@ import java.nio.charset.StandardCharsets;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.MappedByteBufferPool;
|
||||
import org.eclipse.jetty.toolchain.test.EventQueue;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.IO;
|
||||
import org.eclipse.jetty.util.StringUtil;
|
||||
|
@ -81,21 +82,102 @@ import org.junit.Assert;
|
|||
* with regards to basic IO behavior, a write should work as expected, a read should work as expected, but <u>what</u> byte it sends or reads is not within its
|
||||
* scope.
|
||||
*/
|
||||
public class BlockheadClient implements IncomingFrames, OutgoingFrames, ConnectionStateListener, AutoCloseable
|
||||
public class BlockheadClient implements OutgoingFrames, ConnectionStateListener, AutoCloseable
|
||||
{
|
||||
private class FrameReadingThread extends Thread implements Runnable, IncomingFrames
|
||||
{
|
||||
public long totalBytes = 0;
|
||||
public long totalReadOps = 0;
|
||||
public long totalParseOps = 0;
|
||||
|
||||
public EventQueue<WebSocketFrame> frames = new EventQueue<>();
|
||||
public EventQueue<Throwable> errors = new EventQueue<>();
|
||||
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
LOG.debug("Reading frames from server");
|
||||
|
||||
byte buf[] = new byte[BUFFER_SIZE];
|
||||
try
|
||||
{
|
||||
int len = 0;
|
||||
int available = 0;
|
||||
while (!eof)
|
||||
{
|
||||
available = in.available();
|
||||
len = in.read(buf,0,Math.min(available,buf.length));
|
||||
totalReadOps++;
|
||||
if (len < 0)
|
||||
{
|
||||
eof = true;
|
||||
break;
|
||||
}
|
||||
else if (len > 0)
|
||||
{
|
||||
totalBytes += len;
|
||||
ByteBuffer bbuf = ByteBuffer.wrap(buf,0,len);
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("Read {} bytes: {}",len,BufferUtil.toDetailString(bbuf));
|
||||
}
|
||||
totalParseOps++;
|
||||
parser.parse(bbuf);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
LOG.debug(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
StringBuilder str = new StringBuilder();
|
||||
str.append("FrameReadingThread[");
|
||||
str.append(",frames=" + frames.size());
|
||||
str.append(",errors=" + errors.size());
|
||||
str.append(String.format(",totalBytes=%,d",totalBytes));
|
||||
str.append(String.format(",totalReadOps=%,d",totalReadOps));
|
||||
str.append(String.format(",totalParseOps=%,d",totalParseOps));
|
||||
str.append("]");
|
||||
return str.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void incomingError(Throwable t)
|
||||
{
|
||||
this.errors.add(t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void incomingFrame(Frame frame)
|
||||
{
|
||||
this.frames.add(WebSocketFrame.copy(frame));
|
||||
}
|
||||
|
||||
public synchronized void clear()
|
||||
{
|
||||
this.frames.clear();
|
||||
this.errors.clear();
|
||||
}
|
||||
}
|
||||
|
||||
private static final String REQUEST_HASH_KEY = "dGhlIHNhbXBsZSBub25jZQ==";
|
||||
private static final int BUFFER_SIZE = 8192;
|
||||
private static final int BUFFER_SIZE = 64 * 1024;
|
||||
private static final Logger LOG = Log.getLogger(BlockheadClient.class);
|
||||
/** Set to true to disable timeouts (for debugging reasons) */
|
||||
private boolean debug = false;
|
||||
private final URI destHttpURI;
|
||||
private final URI destWebsocketURI;
|
||||
private final ByteBufferPool bufferPool;
|
||||
private final Generator generator;
|
||||
private final Parser parser;
|
||||
private final IncomingFramesCapture incomingFrames;
|
||||
private final WebSocketExtensionFactory extensionFactory;
|
||||
|
||||
private final WebSocketExtensionFactory extensionFactory;
|
||||
private FrameReadingThread frameReader;
|
||||
|
||||
private ExecutorService executor;
|
||||
private Socket socket;
|
||||
private OutputStream out;
|
||||
private InputStream in;
|
||||
|
@ -103,16 +185,15 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti
|
|||
private String protocols;
|
||||
private List<String> extensions = new ArrayList<>();
|
||||
private List<String> headers = new ArrayList<>();
|
||||
private byte[] clientmask = new byte[]
|
||||
{ (byte)0xFF, (byte)0xFF, (byte)0xFF, (byte)0xFF };
|
||||
private byte[] clientmask = new byte[] { (byte)0xFF, (byte)0xFF, (byte)0xFF, (byte)0xFF };
|
||||
private int timeout = 1000;
|
||||
private AtomicInteger parseCount;
|
||||
private OutgoingFrames outgoing = this;
|
||||
private boolean eof = false;
|
||||
private ExtensionStack extensionStack;
|
||||
private IOState ioState;
|
||||
private CountDownLatch disconnectedLatch = new CountDownLatch(1);
|
||||
private ByteBuffer remainingBuffer;
|
||||
|
||||
private String connectionValue = "Upgrade";
|
||||
|
||||
public BlockheadClient(URI destWebsocketURI) throws URISyntaxException
|
||||
|
@ -137,9 +218,6 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti
|
|||
this.bufferPool = new MappedByteBufferPool(8192);
|
||||
this.generator = new Generator(policy,bufferPool);
|
||||
this.parser = new Parser(policy,bufferPool);
|
||||
this.parseCount = new AtomicInteger(0);
|
||||
|
||||
this.incomingFrames = new IncomingFramesCapture();
|
||||
|
||||
this.extensionFactory = new WebSocketExtensionFactory(policy,bufferPool);
|
||||
this.ioState = new IOState();
|
||||
|
@ -161,9 +239,38 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti
|
|||
return disconnectedLatch.await(timeout,unit);
|
||||
}
|
||||
|
||||
protected int blockingRead(ByteBuffer buf) throws IOException
|
||||
{
|
||||
if (eof)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
if ((remainingBuffer != null) && (remainingBuffer.remaining() > 0))
|
||||
{
|
||||
return BufferUtil.put(remainingBuffer,buf);
|
||||
}
|
||||
|
||||
int len = -1;
|
||||
int b;
|
||||
while (buf.remaining() > 0)
|
||||
{
|
||||
b = in.read();
|
||||
if (b == (-1))
|
||||
{
|
||||
eof = true;
|
||||
break;
|
||||
}
|
||||
buf.put((byte)b);
|
||||
len++;
|
||||
}
|
||||
|
||||
return len;
|
||||
}
|
||||
|
||||
public void clearCaptured()
|
||||
{
|
||||
this.incomingFrames.clear();
|
||||
frameReader.clear();
|
||||
}
|
||||
|
||||
public void clearExtensions()
|
||||
|
@ -171,6 +278,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti
|
|||
extensions.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
LOG.debug("close()");
|
||||
|
@ -185,7 +293,9 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti
|
|||
if (!ioState.isClosed())
|
||||
{
|
||||
ioState.onCloseLocal(close);
|
||||
} else {
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG.debug("Not issuing close. ioState = {}",ioState);
|
||||
}
|
||||
}
|
||||
|
@ -211,6 +321,10 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti
|
|||
IO.close(in);
|
||||
IO.close(out);
|
||||
disconnectedLatch.countDown();
|
||||
if (frameReader != null)
|
||||
{
|
||||
frameReader.interrupt();
|
||||
}
|
||||
if (socket != null)
|
||||
{
|
||||
try
|
||||
|
@ -282,8 +396,12 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti
|
|||
extensionStack = new ExtensionStack(this.extensionFactory);
|
||||
extensionStack.negotiate(configs);
|
||||
|
||||
// Setup Frame Reader
|
||||
this.frameReader = new FrameReadingThread();
|
||||
this.frameReader.start();
|
||||
|
||||
// Start with default routing
|
||||
extensionStack.setNextIncoming(this); // the websocket layer
|
||||
extensionStack.setNextIncoming(frameReader); // the websocket layer
|
||||
extensionStack.setNextOutgoing(outgoing); // the network layer
|
||||
|
||||
// Configure Parser / Generator
|
||||
|
@ -320,6 +438,15 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti
|
|||
return connectionValue;
|
||||
}
|
||||
|
||||
public ExecutorService getExecutor()
|
||||
{
|
||||
if (executor == null)
|
||||
{
|
||||
executor = Executors.newCachedThreadPool();
|
||||
}
|
||||
return executor;
|
||||
}
|
||||
|
||||
private List<ExtensionConfig> getExtensionConfigs(HttpResponse response)
|
||||
{
|
||||
List<ExtensionConfig> configs = new ArrayList<>();
|
||||
|
@ -397,33 +524,6 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti
|
|||
return destWebsocketURI;
|
||||
}
|
||||
|
||||
/**
|
||||
* Errors received (after extensions)
|
||||
*/
|
||||
@Override
|
||||
public void incomingError(Throwable e)
|
||||
{
|
||||
incomingFrames.incomingError(e);
|
||||
}
|
||||
|
||||
/**
|
||||
* Frames received (after extensions)
|
||||
*/
|
||||
@Override
|
||||
public void incomingFrame(Frame frame)
|
||||
{
|
||||
LOG.debug("incoming({})",frame);
|
||||
int count = parseCount.incrementAndGet();
|
||||
if ((count % 10) == 0)
|
||||
{
|
||||
LOG.info("Client parsed {} frames",count);
|
||||
}
|
||||
|
||||
// Capture Frame Copy
|
||||
WebSocketFrame copy = WebSocketFrame.copy(frame);
|
||||
incomingFrames.incomingFrame(copy);
|
||||
}
|
||||
|
||||
public boolean isConnected()
|
||||
{
|
||||
return (socket != null) && (socket.isConnected());
|
||||
|
@ -432,7 +532,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti
|
|||
@Override
|
||||
public void onConnectionStateChange(ConnectionState state)
|
||||
{
|
||||
LOG.debug("CLIENT onConnectionStateChange() - {}", state);
|
||||
LOG.debug("CLIENT onConnectionStateChange() - {}",state);
|
||||
switch (state)
|
||||
{
|
||||
case CLOSED:
|
||||
|
@ -441,7 +541,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti
|
|||
break;
|
||||
case CLOSING:
|
||||
CloseInfo close = ioState.getCloseInfo();
|
||||
|
||||
|
||||
WebSocketFrame frame = close.asFrame();
|
||||
LOG.debug("Issuing: {}",frame);
|
||||
try
|
||||
|
@ -524,68 +624,10 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti
|
|||
return len;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public IncomingFramesCapture readFrames(int expectedCount, TimeUnit timeoutUnit, int timeoutDuration) throws IOException, TimeoutException
|
||||
public EventQueue<WebSocketFrame> readFrames(int expectedFrameCount, int timeoutDuration, TimeUnit timeoutUnit) throws Exception
|
||||
{
|
||||
return readFrames(expectedCount,timeoutDuration,timeoutUnit);
|
||||
}
|
||||
|
||||
public IncomingFramesCapture readFrames(int expectedCount, int timeoutDuration, TimeUnit timeoutUnit) throws IOException, TimeoutException
|
||||
{
|
||||
LOG.debug("Read: waiting for {} frame(s) from server",expectedCount);
|
||||
|
||||
ByteBuffer buf = bufferPool.acquire(BUFFER_SIZE,false);
|
||||
BufferUtil.clearToFill(buf);
|
||||
try
|
||||
{
|
||||
long msDur = TimeUnit.MILLISECONDS.convert(timeoutDuration,timeoutUnit);
|
||||
long now = System.currentTimeMillis();
|
||||
long expireOn = now + msDur;
|
||||
LOG.debug("Now: {} - expireOn: {} ({} ms)",now,expireOn,msDur);
|
||||
|
||||
long iter = 0;
|
||||
|
||||
int len = 0;
|
||||
while (incomingFrames.size() < expectedCount)
|
||||
{
|
||||
BufferUtil.clearToFill(buf);
|
||||
len = read(buf);
|
||||
if (len > 0)
|
||||
{
|
||||
BufferUtil.flipToFlush(buf,0);
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("Read {} bytes: {}",len,BufferUtil.toDetailString(buf));
|
||||
}
|
||||
parser.parse(buf);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
iter++;
|
||||
if ((iter % 10000000) == 0)
|
||||
{
|
||||
LOG.debug("10,000,000 reads of zero length");
|
||||
iter = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!debug && (System.currentTimeMillis() > expireOn))
|
||||
{
|
||||
incomingFrames.dump();
|
||||
throw new TimeoutException(String.format("Timeout reading all %d expected frames. (managed to only read %d frame(s))",expectedCount,
|
||||
incomingFrames.size()));
|
||||
}
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
bufferPool.release(buf);
|
||||
}
|
||||
|
||||
return incomingFrames;
|
||||
frameReader.frames.awaitEventCount(expectedFrameCount,timeoutDuration,timeoutUnit);
|
||||
return frameReader.frames;
|
||||
}
|
||||
|
||||
public HttpResponse readResponseHeader() throws IOException
|
||||
|
@ -641,9 +683,9 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti
|
|||
this.connectionValue = connectionValue;
|
||||
}
|
||||
|
||||
public void setDebug(boolean flag)
|
||||
public void setExecutor(ExecutorService executor)
|
||||
{
|
||||
this.debug = flag;
|
||||
this.executor = executor;
|
||||
}
|
||||
|
||||
public void setProtocols(String protocols)
|
||||
|
@ -651,7 +693,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti
|
|||
this.protocols = protocols;
|
||||
}
|
||||
|
||||
public void setTimeout(TimeUnit unit, int duration)
|
||||
public void setTimeout(int duration, TimeUnit unit)
|
||||
{
|
||||
this.timeout = (int)TimeUnit.MILLISECONDS.convert(duration,unit);
|
||||
}
|
||||
|
@ -705,14 +747,13 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti
|
|||
LOG.debug("write(Frame->{}) to {}",frame,outgoing);
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
frame.setMask(new byte[]
|
||||
{ 0x00, 0x00, 0x00, 0x00 });
|
||||
frame.setMask(new byte[] { 0x00, 0x00, 0x00, 0x00 });
|
||||
}
|
||||
else
|
||||
{
|
||||
frame.setMask(clientmask);
|
||||
}
|
||||
extensionStack.outgoingFrame(frame,null, BatchMode.OFF);
|
||||
extensionStack.outgoingFrame(frame,null,BatchMode.OFF);
|
||||
}
|
||||
|
||||
public void writeRaw(ByteBuffer buf) throws IOException
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.common.test;
|
||||
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.SocketException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -25,8 +27,8 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.eclipse.jetty.toolchain.test.EventQueue;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
@ -39,9 +41,6 @@ import org.eclipse.jetty.websocket.common.WebSocketFrame;
|
|||
import org.eclipse.jetty.websocket.common.io.IOState;
|
||||
import org.junit.Assert;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
/**
|
||||
* Fuzzing utility for the AB tests.
|
||||
*/
|
||||
|
@ -95,6 +94,7 @@ public class Fuzzer implements AutoCloseable
|
|||
policy.setIdleTimeout(5000);
|
||||
|
||||
this.client = new BlockheadClient(policy,testcase.getServerURI());
|
||||
this.client.setTimeout(2,TimeUnit.SECONDS);
|
||||
this.generator = testcase.getLaxGenerator();
|
||||
this.testname = testcase.getTestMethodName();
|
||||
}
|
||||
|
@ -140,28 +140,24 @@ public class Fuzzer implements AutoCloseable
|
|||
}
|
||||
}
|
||||
|
||||
public void expect(List<WebSocketFrame> expect) throws IOException, TimeoutException
|
||||
public void expect(List<WebSocketFrame> expect) throws Exception
|
||||
{
|
||||
expect(expect,TimeUnit.SECONDS,10);
|
||||
expect(expect,10,TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public void expect(List<WebSocketFrame> expect, TimeUnit unit, int duration) throws IOException, TimeoutException
|
||||
public void expect(List<WebSocketFrame> expect, int duration, TimeUnit unit) throws Exception
|
||||
{
|
||||
int expectedCount = expect.size();
|
||||
LOG.debug("expect() {} frame(s)",expect.size());
|
||||
|
||||
// Read frames
|
||||
IncomingFramesCapture capture = client.readFrames(expect.size(),unit,duration);
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
capture.dump();
|
||||
}
|
||||
|
||||
EventQueue<WebSocketFrame> frames = client.readFrames(expect.size(),duration,unit);
|
||||
|
||||
String prefix = "";
|
||||
for (int i = 0; i < expectedCount; i++)
|
||||
{
|
||||
WebSocketFrame expected = expect.get(i);
|
||||
WebSocketFrame actual = capture.getFrames().poll();
|
||||
WebSocketFrame actual = frames.poll();
|
||||
|
||||
prefix = "Frame[" + i + "]";
|
||||
|
||||
|
@ -183,7 +179,7 @@ public class Fuzzer implements AutoCloseable
|
|||
}
|
||||
}
|
||||
|
||||
public void expect(WebSocketFrame expect) throws IOException, TimeoutException
|
||||
public void expect(WebSocketFrame expect) throws Exception
|
||||
{
|
||||
expect(Collections.singletonList(expect));
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.ServerConnector;
|
||||
import org.eclipse.jetty.toolchain.test.EventQueue;
|
||||
import org.eclipse.jetty.util.log.StacklessLogging;
|
||||
import org.eclipse.jetty.websocket.api.StatusCode;
|
||||
import org.eclipse.jetty.websocket.common.CloseInfo;
|
||||
|
@ -36,7 +37,6 @@ import org.eclipse.jetty.websocket.common.Parser;
|
|||
import org.eclipse.jetty.websocket.common.WebSocketFrame;
|
||||
import org.eclipse.jetty.websocket.common.frames.TextFrame;
|
||||
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
|
||||
import org.eclipse.jetty.websocket.common.test.IncomingFramesCapture;
|
||||
import org.eclipse.jetty.websocket.server.examples.echo.BigEchoSocket;
|
||||
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
|
||||
import org.junit.AfterClass;
|
||||
|
@ -100,8 +100,8 @@ public class AnnotatedMaxMessageSizeTest
|
|||
client.write(new TextFrame().setPayload(msg));
|
||||
|
||||
// Read frame (hopefully text frame)
|
||||
IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,500);
|
||||
WebSocketFrame tf = capture.getFrames().poll();
|
||||
EventQueue<WebSocketFrame> frames = client.readFrames(1,500,TimeUnit.MILLISECONDS);
|
||||
WebSocketFrame tf = frames.poll();
|
||||
Assert.assertThat("Text Frame.status code",tf.getPayloadAsUTF8(),is(msg));
|
||||
}
|
||||
finally
|
||||
|
@ -127,8 +127,8 @@ public class AnnotatedMaxMessageSizeTest
|
|||
client.write(new TextFrame().setPayload(ByteBuffer.wrap(buf)));
|
||||
|
||||
// Read frame (hopefully close frame saying its too large)
|
||||
IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,500);
|
||||
WebSocketFrame tf = capture.getFrames().poll();
|
||||
EventQueue<WebSocketFrame> frames = client.readFrames(1,500,TimeUnit.MILLISECONDS);
|
||||
WebSocketFrame tf = frames.poll();
|
||||
Assert.assertThat("Frame is close", tf.getOpCode(), is(OpCode.CLOSE));
|
||||
CloseInfo close = new CloseInfo(tf);
|
||||
Assert.assertThat("Close Code", close.getStatusCode(), is(StatusCode.MESSAGE_TOO_LARGE));
|
||||
|
|
|
@ -22,19 +22,17 @@ import static org.hamcrest.Matchers.*;
|
|||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.eclipse.jetty.toolchain.test.EventQueue;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketFrame;
|
||||
import org.eclipse.jetty.websocket.common.frames.TextFrame;
|
||||
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
|
||||
import org.eclipse.jetty.websocket.common.test.HttpResponse;
|
||||
import org.eclipse.jetty.websocket.common.test.IncomingFramesCapture;
|
||||
import org.eclipse.jetty.websocket.server.examples.MyEchoServlet;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
@Ignore("Bug 395444")
|
||||
public class ChromeTest
|
||||
{
|
||||
private static SimpleServletServer server;
|
||||
|
@ -70,8 +68,8 @@ public class ChromeTest
|
|||
client.write(new TextFrame().setPayload(msg));
|
||||
|
||||
// Read frame (hopefully text frame)
|
||||
IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,500);
|
||||
WebSocketFrame tf = capture.getFrames().poll();
|
||||
EventQueue<WebSocketFrame> frames = client.readFrames(1,500,TimeUnit.MILLISECONDS);
|
||||
WebSocketFrame tf = frames.poll();
|
||||
Assert.assertThat("Text Frame.status code",tf.getPayloadAsUTF8(),is(msg));
|
||||
}
|
||||
finally
|
||||
|
|
|
@ -18,20 +18,20 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.server;
|
||||
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.eclipse.jetty.toolchain.test.EventQueue;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketFrame;
|
||||
import org.eclipse.jetty.websocket.common.frames.TextFrame;
|
||||
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
|
||||
import org.eclipse.jetty.websocket.common.test.IncomingFramesCapture;
|
||||
import org.eclipse.jetty.websocket.server.examples.MyEchoServlet;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
public class FirefoxTest
|
||||
{
|
||||
private static SimpleServletServer server;
|
||||
|
@ -65,8 +65,8 @@ public class FirefoxTest
|
|||
client.write(new TextFrame().setPayload(msg));
|
||||
|
||||
// Read frame (hopefully text frame)
|
||||
IncomingFramesCapture capture = client.readFrames(1, TimeUnit.MILLISECONDS, 500);
|
||||
WebSocketFrame tf = capture.getFrames().poll();
|
||||
EventQueue<WebSocketFrame> frames = client.readFrames(1, 500, TimeUnit.MILLISECONDS);
|
||||
WebSocketFrame tf = frames.poll();
|
||||
Assert.assertThat("Text Frame.status code", tf.getPayloadAsUTF8(), is(msg));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,11 +22,11 @@ import static org.hamcrest.Matchers.*;
|
|||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.eclipse.jetty.toolchain.test.EventQueue;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketFrame;
|
||||
import org.eclipse.jetty.websocket.common.frames.TextFrame;
|
||||
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
|
||||
import org.eclipse.jetty.websocket.common.test.HttpResponse;
|
||||
import org.eclipse.jetty.websocket.common.test.IncomingFramesCapture;
|
||||
import org.eclipse.jetty.websocket.server.helper.EchoServlet;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
|
@ -79,7 +79,7 @@ public class FragmentExtensionTest
|
|||
try
|
||||
{
|
||||
// Make sure the read times out if there are problems with the implementation
|
||||
client.setTimeout(TimeUnit.SECONDS,1);
|
||||
client.setTimeout(1,TimeUnit.SECONDS);
|
||||
client.connect();
|
||||
client.sendStandardRequest();
|
||||
HttpResponse resp = client.expectUpgradeResponse();
|
||||
|
@ -90,10 +90,10 @@ public class FragmentExtensionTest
|
|||
client.write(new TextFrame().setPayload(msg));
|
||||
|
||||
String parts[] = split(msg,fragSize);
|
||||
IncomingFramesCapture capture = client.readFrames(parts.length,TimeUnit.MILLISECONDS,1000);
|
||||
EventQueue<WebSocketFrame> frames = client.readFrames(parts.length,1000,TimeUnit.MILLISECONDS);
|
||||
for (int i = 0; i < parts.length; i++)
|
||||
{
|
||||
WebSocketFrame frame = capture.getFrames().poll();
|
||||
WebSocketFrame frame = frames.poll();
|
||||
Assert.assertThat("text[" + i + "].payload",frame.getPayloadAsUTF8(),is(parts[i]));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,11 +22,11 @@ import static org.hamcrest.Matchers.*;
|
|||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.eclipse.jetty.toolchain.test.EventQueue;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketFrame;
|
||||
import org.eclipse.jetty.websocket.common.frames.TextFrame;
|
||||
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
|
||||
import org.eclipse.jetty.websocket.common.test.HttpResponse;
|
||||
import org.eclipse.jetty.websocket.common.test.IncomingFramesCapture;
|
||||
import org.eclipse.jetty.websocket.server.helper.EchoServlet;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
|
@ -61,7 +61,7 @@ public class FrameCompressionExtensionTest
|
|||
try
|
||||
{
|
||||
// Make sure the read times out if there are problems with the implementation
|
||||
client.setTimeout(TimeUnit.SECONDS,1);
|
||||
client.setTimeout(1,TimeUnit.SECONDS);
|
||||
client.connect();
|
||||
client.sendStandardRequest();
|
||||
HttpResponse resp = client.expectUpgradeResponse();
|
||||
|
@ -73,8 +73,8 @@ public class FrameCompressionExtensionTest
|
|||
// Client sends first message
|
||||
client.write(new TextFrame().setPayload(msg));
|
||||
|
||||
IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,1000);
|
||||
WebSocketFrame frame = capture.getFrames().poll();
|
||||
EventQueue<WebSocketFrame> frames = client.readFrames(1,1000,TimeUnit.MILLISECONDS);
|
||||
WebSocketFrame frame = frames.poll();
|
||||
Assert.assertThat("TEXT.payload",frame.getPayloadAsUTF8(),is(msg.toString()));
|
||||
|
||||
// Client sends second message
|
||||
|
@ -82,8 +82,8 @@ public class FrameCompressionExtensionTest
|
|||
msg = "There";
|
||||
client.write(new TextFrame().setPayload(msg));
|
||||
|
||||
capture = client.readFrames(1,TimeUnit.SECONDS,1);
|
||||
frame = capture.getFrames().poll();
|
||||
frames = client.readFrames(1,1,TimeUnit.SECONDS);
|
||||
frame = frames.poll();
|
||||
Assert.assertThat("TEXT.payload",frame.getPayloadAsUTF8(),is(msg.toString()));
|
||||
}
|
||||
finally
|
||||
|
|
|
@ -22,11 +22,11 @@ import static org.hamcrest.Matchers.*;
|
|||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.eclipse.jetty.toolchain.test.EventQueue;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketFrame;
|
||||
import org.eclipse.jetty.websocket.common.frames.TextFrame;
|
||||
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
|
||||
import org.eclipse.jetty.websocket.common.test.HttpResponse;
|
||||
import org.eclipse.jetty.websocket.common.test.IncomingFramesCapture;
|
||||
import org.eclipse.jetty.websocket.server.helper.EchoServlet;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
|
@ -64,7 +64,7 @@ public class IdentityExtensionTest
|
|||
try
|
||||
{
|
||||
// Make sure the read times out if there are problems with the implementation
|
||||
client.setTimeout(TimeUnit.SECONDS,1);
|
||||
client.setTimeout(1,TimeUnit.SECONDS);
|
||||
client.connect();
|
||||
client.sendStandardRequest();
|
||||
HttpResponse resp = client.expectUpgradeResponse();
|
||||
|
@ -73,8 +73,8 @@ public class IdentityExtensionTest
|
|||
|
||||
client.write(new TextFrame().setPayload("Hello"));
|
||||
|
||||
IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,1000);
|
||||
WebSocketFrame frame = capture.getFrames().poll();
|
||||
EventQueue<WebSocketFrame> frames = client.readFrames(1,1000,TimeUnit.MILLISECONDS);
|
||||
WebSocketFrame frame = frames.poll();
|
||||
Assert.assertThat("TEXT.payload",frame.getPayloadAsUTF8(),is("Hello"));
|
||||
}
|
||||
finally
|
||||
|
|
|
@ -22,13 +22,13 @@ import static org.hamcrest.Matchers.*;
|
|||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.eclipse.jetty.toolchain.test.EventQueue;
|
||||
import org.eclipse.jetty.websocket.api.StatusCode;
|
||||
import org.eclipse.jetty.websocket.common.CloseInfo;
|
||||
import org.eclipse.jetty.websocket.common.OpCode;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketFrame;
|
||||
import org.eclipse.jetty.websocket.common.frames.TextFrame;
|
||||
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
|
||||
import org.eclipse.jetty.websocket.common.test.IncomingFramesCapture;
|
||||
import org.eclipse.jetty.websocket.server.helper.RFCSocket;
|
||||
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
|
||||
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
|
||||
|
@ -73,7 +73,7 @@ public class IdleTimeoutTest
|
|||
{
|
||||
BlockheadClient client = new BlockheadClient(server.getServerUri());
|
||||
client.setProtocols("onConnect");
|
||||
client.setTimeout(TimeUnit.MILLISECONDS,2500);
|
||||
client.setTimeout(2500,TimeUnit.MILLISECONDS);
|
||||
try
|
||||
{
|
||||
client.connect();
|
||||
|
@ -92,8 +92,8 @@ public class IdleTimeoutTest
|
|||
client.write(new TextFrame().setPayload("Hello"));
|
||||
|
||||
// Expect server to have closed due to its own timeout
|
||||
IncomingFramesCapture capture = client.readFrames(1,500,TimeUnit.MILLISECONDS);
|
||||
WebSocketFrame frame = capture.getFrames().poll();
|
||||
EventQueue<WebSocketFrame> frames = client.readFrames(1,500,TimeUnit.MILLISECONDS);
|
||||
WebSocketFrame frame = frames.poll();
|
||||
Assert.assertThat("frame opcode",frame.getOpCode(),is(OpCode.CLOSE));
|
||||
CloseInfo close = new CloseInfo(frame);
|
||||
Assert.assertThat("close code",close.getStatusCode(),is(StatusCode.SHUTDOWN));
|
||||
|
|
|
@ -18,22 +18,21 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.server;
|
||||
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.eclipse.jetty.toolchain.test.EventQueue;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketFrame;
|
||||
import org.eclipse.jetty.websocket.common.frames.TextFrame;
|
||||
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
|
||||
import org.eclipse.jetty.websocket.common.test.HttpResponse;
|
||||
import org.eclipse.jetty.websocket.common.test.IncomingFramesCapture;
|
||||
import org.eclipse.jetty.websocket.server.helper.EchoServlet;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
public class PerMessageDeflateExtensionTest
|
||||
{
|
||||
private static SimpleServletServer server;
|
||||
|
@ -65,7 +64,7 @@ public class PerMessageDeflateExtensionTest
|
|||
try
|
||||
{
|
||||
// Make sure the read times out if there are problems with the implementation
|
||||
client.setTimeout(TimeUnit.SECONDS,1);
|
||||
client.setTimeout(1,TimeUnit.SECONDS);
|
||||
client.connect();
|
||||
client.sendStandardRequest();
|
||||
HttpResponse resp = client.expectUpgradeResponse();
|
||||
|
@ -77,8 +76,8 @@ public class PerMessageDeflateExtensionTest
|
|||
// Client sends first message
|
||||
client.write(new TextFrame().setPayload(msg));
|
||||
|
||||
IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,1000);
|
||||
WebSocketFrame frame = capture.getFrames().poll();
|
||||
EventQueue<WebSocketFrame> frames = client.readFrames(1,1000,TimeUnit.MILLISECONDS);
|
||||
WebSocketFrame frame = frames.poll();
|
||||
Assert.assertThat("TEXT.payload",frame.getPayloadAsUTF8(),is(msg.toString()));
|
||||
|
||||
// Client sends second message
|
||||
|
@ -86,8 +85,8 @@ public class PerMessageDeflateExtensionTest
|
|||
msg = "There";
|
||||
client.write(new TextFrame().setPayload(msg));
|
||||
|
||||
capture = client.readFrames(1,TimeUnit.SECONDS,1);
|
||||
frame = capture.getFrames().poll();
|
||||
frames = client.readFrames(1,1,TimeUnit.SECONDS);
|
||||
frame = frames.poll();
|
||||
Assert.assertThat("TEXT.payload",frame.getPayloadAsUTF8(),is(msg.toString()));
|
||||
}
|
||||
finally
|
||||
|
|
|
@ -107,7 +107,7 @@ public class RequestHeadersTest
|
|||
public void testAccessRequestCookies() throws Exception
|
||||
{
|
||||
BlockheadClient client = new BlockheadClient(server.getServerUri());
|
||||
client.setTimeout(TimeUnit.SECONDS,1);
|
||||
client.setTimeout(1,TimeUnit.SECONDS);
|
||||
|
||||
try
|
||||
{
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.List;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.eclipse.jetty.toolchain.test.EventQueue;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.log.StacklessLogging;
|
||||
|
@ -36,7 +37,6 @@ import org.eclipse.jetty.websocket.common.OpCode;
|
|||
import org.eclipse.jetty.websocket.common.WebSocketFrame;
|
||||
import org.eclipse.jetty.websocket.common.events.AbstractEventDriver;
|
||||
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
|
||||
import org.eclipse.jetty.websocket.common.test.IncomingFramesCapture;
|
||||
import org.eclipse.jetty.websocket.server.helper.RFCSocket;
|
||||
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
|
||||
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
|
||||
|
@ -162,14 +162,14 @@ public class WebSocketCloseTest
|
|||
try (BlockheadClient client = new BlockheadClient(server.getServerUri()))
|
||||
{
|
||||
client.setProtocols("fastclose");
|
||||
client.setTimeout(TimeUnit.SECONDS,1);
|
||||
client.setTimeout(1,TimeUnit.SECONDS);
|
||||
client.connect();
|
||||
client.sendStandardRequest();
|
||||
client.expectUpgradeResponse();
|
||||
|
||||
// Verify that client got close frame
|
||||
IncomingFramesCapture capture = client.readFrames(1,TimeUnit.SECONDS,1);
|
||||
WebSocketFrame frame = capture.getFrames().poll();
|
||||
EventQueue<WebSocketFrame> frames = client.readFrames(1,1,TimeUnit.SECONDS);
|
||||
WebSocketFrame frame = frames.poll();
|
||||
Assert.assertThat("frames[0].opcode",frame.getOpCode(),is(OpCode.CLOSE));
|
||||
CloseInfo close = new CloseInfo(frame);
|
||||
Assert.assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.NORMAL));
|
||||
|
@ -192,15 +192,15 @@ public class WebSocketCloseTest
|
|||
try (BlockheadClient client = new BlockheadClient(server.getServerUri()))
|
||||
{
|
||||
client.setProtocols("fastfail");
|
||||
client.setTimeout(TimeUnit.SECONDS,1);
|
||||
client.setTimeout(1,TimeUnit.SECONDS);
|
||||
try (StacklessLogging scope = new StacklessLogging(AbstractEventDriver.class))
|
||||
{
|
||||
client.connect();
|
||||
client.sendStandardRequest();
|
||||
client.expectUpgradeResponse();
|
||||
|
||||
IncomingFramesCapture capture = client.readFrames(1,TimeUnit.SECONDS,1);
|
||||
WebSocketFrame frame = capture.getFrames().poll();
|
||||
EventQueue<WebSocketFrame> frames = client.readFrames(1,1,TimeUnit.SECONDS);
|
||||
WebSocketFrame frame = frames.poll();
|
||||
Assert.assertThat("frames[0].opcode",frame.getOpCode(),is(OpCode.CLOSE));
|
||||
CloseInfo close = new CloseInfo(frame);
|
||||
Assert.assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.SERVER_ERROR));
|
||||
|
|
|
@ -18,15 +18,16 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.server;
|
||||
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
|
||||
import org.eclipse.jetty.toolchain.test.EventQueue;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketFrame;
|
||||
import org.eclipse.jetty.websocket.common.frames.TextFrame;
|
||||
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
|
||||
import org.eclipse.jetty.websocket.common.test.IncomingFramesCapture;
|
||||
import org.eclipse.jetty.websocket.server.helper.SessionServlet;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
|
@ -34,8 +35,6 @@ import org.junit.BeforeClass;
|
|||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
/**
|
||||
* Testing various aspects of the server side support for WebSocket {@link Session}
|
||||
*/
|
||||
|
@ -90,8 +89,7 @@ public class WebSocketServerSessionTest
|
|||
client.write(new TextFrame().setPayload("getParameterMap|cost")); // intentionally invalid
|
||||
|
||||
// Read frame (hopefully text frame)
|
||||
IncomingFramesCapture capture = client.readFrames(4, TimeUnit.SECONDS, 5);
|
||||
Queue<WebSocketFrame> frames = capture.getFrames();
|
||||
EventQueue<WebSocketFrame> frames = client.readFrames(4,5,TimeUnit.SECONDS);
|
||||
WebSocketFrame tf = frames.poll();
|
||||
Assert.assertThat("Parameter Map[snack]", tf.getPayloadAsUTF8(), is("[cashews]"));
|
||||
tf = frames.poll();
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.Arrays;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
|
||||
import org.eclipse.jetty.toolchain.test.EventQueue;
|
||||
import org.eclipse.jetty.util.Utf8Appendable.NotUtf8Exception;
|
||||
import org.eclipse.jetty.util.Utf8StringBuilder;
|
||||
import org.eclipse.jetty.util.log.StacklessLogging;
|
||||
|
@ -41,7 +42,6 @@ import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
|
|||
import org.eclipse.jetty.websocket.common.frames.ContinuationFrame;
|
||||
import org.eclipse.jetty.websocket.common.frames.TextFrame;
|
||||
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
|
||||
import org.eclipse.jetty.websocket.common.test.IncomingFramesCapture;
|
||||
import org.eclipse.jetty.websocket.common.test.UnitGenerator;
|
||||
import org.eclipse.jetty.websocket.common.util.Hex;
|
||||
import org.eclipse.jetty.websocket.server.helper.RFCServlet;
|
||||
|
@ -116,8 +116,8 @@ public class WebSocketServletRFCTest
|
|||
client.write(bin); // write buf3 (fin=true)
|
||||
|
||||
// Read frame echo'd back (hopefully a single binary frame)
|
||||
IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,1000);
|
||||
Frame binmsg = capture.getFrames().poll();
|
||||
EventQueue<WebSocketFrame> frames = client.readFrames(1,1000,TimeUnit.MILLISECONDS);
|
||||
Frame binmsg = frames.poll();
|
||||
int expectedSize = buf1.length + buf2.length + buf3.length;
|
||||
Assert.assertThat("BinaryFrame.payloadLength",binmsg.getPayloadLength(),is(expectedSize));
|
||||
|
||||
|
@ -182,8 +182,8 @@ public class WebSocketServletRFCTest
|
|||
client.write(new TextFrame().setPayload(msg));
|
||||
|
||||
// Read frame (hopefully text frame)
|
||||
IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,500);
|
||||
WebSocketFrame tf = capture.getFrames().poll();
|
||||
EventQueue<WebSocketFrame> frames = client.readFrames(1,500,TimeUnit.MILLISECONDS);
|
||||
WebSocketFrame tf = frames.poll();
|
||||
Assert.assertThat("Text Frame.status code",tf.getPayloadAsUTF8(),is(msg));
|
||||
}
|
||||
finally
|
||||
|
@ -215,8 +215,8 @@ public class WebSocketServletRFCTest
|
|||
client.write(new TextFrame().setPayload("CRASH"));
|
||||
|
||||
// Read frame (hopefully close frame)
|
||||
IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,500);
|
||||
Frame cf = capture.getFrames().poll();
|
||||
EventQueue<WebSocketFrame> frames = client.readFrames(1,500,TimeUnit.MILLISECONDS);
|
||||
Frame cf = frames.poll();
|
||||
CloseInfo close = new CloseInfo(cf);
|
||||
Assert.assertThat("Close Frame.status code",close.getStatusCode(),is(StatusCode.SERVER_ERROR));
|
||||
}
|
||||
|
@ -261,8 +261,8 @@ public class WebSocketServletRFCTest
|
|||
client.write(new TextFrame().setPayload(msg));
|
||||
|
||||
// Read frame (hopefully text frame)
|
||||
IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,500);
|
||||
WebSocketFrame tf = capture.getFrames().poll();
|
||||
EventQueue<WebSocketFrame> frames = client.readFrames(1,500,TimeUnit.MILLISECONDS);
|
||||
WebSocketFrame tf = frames.poll();
|
||||
Assert.assertThat("Text Frame.status code",tf.getPayloadAsUTF8(),is(msg));
|
||||
}
|
||||
finally
|
||||
|
@ -294,8 +294,8 @@ public class WebSocketServletRFCTest
|
|||
client.writeRaw(bbHeader);
|
||||
client.writeRaw(txt.getPayload());
|
||||
|
||||
IncomingFramesCapture capture = client.readFrames(1,TimeUnit.SECONDS,1);
|
||||
WebSocketFrame frame = capture.getFrames().poll();
|
||||
EventQueue<WebSocketFrame> frames = client.readFrames(1,1,TimeUnit.SECONDS);
|
||||
WebSocketFrame frame = frames.poll();
|
||||
Assert.assertThat("frames[0].opcode",frame.getOpCode(),is(OpCode.CLOSE));
|
||||
CloseInfo close = new CloseInfo(frame);
|
||||
Assert.assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.BAD_PAYLOAD));
|
||||
|
@ -340,8 +340,8 @@ public class WebSocketServletRFCTest
|
|||
client.write(new TextFrame().setPayload(msg));
|
||||
|
||||
// Read frame (hopefully text frame)
|
||||
IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,500);
|
||||
WebSocketFrame tf = capture.getFrames().poll();
|
||||
EventQueue<WebSocketFrame> frames = client.readFrames(1,500,TimeUnit.MILLISECONDS);
|
||||
WebSocketFrame tf = frames.poll();
|
||||
Assert.assertThat("Text Frame.status code",tf.getPayloadAsUTF8(),is(msg));
|
||||
}
|
||||
finally
|
||||
|
|
|
@ -101,7 +101,7 @@ public class TestABCase9 extends AbstractABCase
|
|||
fuzzer.connect();
|
||||
fuzzer.setSendMode(Fuzzer.SendMode.BULK);
|
||||
fuzzer.send(send);
|
||||
fuzzer.expect(expect,TimeUnit.SECONDS,8);
|
||||
fuzzer.expect(expect,8,TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -125,7 +125,7 @@ public class TestABCase9 extends AbstractABCase
|
|||
fuzzer.setSendMode(Fuzzer.SendMode.SLOW);
|
||||
fuzzer.setSlowSendSegmentSize(segmentSize);
|
||||
fuzzer.send(send);
|
||||
fuzzer.expect(expect,TimeUnit.SECONDS,8);
|
||||
fuzzer.expect(expect,8,TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -187,7 +187,6 @@ public class TestABCase9 extends AbstractABCase
|
|||
* Echo 1MB text message (1 frame)
|
||||
*/
|
||||
@Test
|
||||
@Stress("High I/O use")
|
||||
public void testCase9_1_3() throws Exception
|
||||
{
|
||||
byte utf[] = new byte[1 * MBYTE];
|
||||
|
@ -207,7 +206,7 @@ public class TestABCase9 extends AbstractABCase
|
|||
fuzzer.connect();
|
||||
fuzzer.setSendMode(Fuzzer.SendMode.BULK);
|
||||
fuzzer.send(send);
|
||||
fuzzer.expect(expect,TimeUnit.SECONDS,4);
|
||||
fuzzer.expect(expect,4,TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -215,7 +214,6 @@ public class TestABCase9 extends AbstractABCase
|
|||
* Echo 4MB text message (1 frame)
|
||||
*/
|
||||
@Test
|
||||
@Stress("High I/O use")
|
||||
public void testCase9_1_4() throws Exception
|
||||
{
|
||||
byte utf[] = new byte[4 * MBYTE];
|
||||
|
@ -235,7 +233,7 @@ public class TestABCase9 extends AbstractABCase
|
|||
fuzzer.connect();
|
||||
fuzzer.setSendMode(Fuzzer.SendMode.BULK);
|
||||
fuzzer.send(send);
|
||||
fuzzer.expect(expect,TimeUnit.SECONDS,8);
|
||||
fuzzer.expect(expect,8,TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -263,7 +261,7 @@ public class TestABCase9 extends AbstractABCase
|
|||
fuzzer.connect();
|
||||
fuzzer.setSendMode(Fuzzer.SendMode.BULK);
|
||||
fuzzer.send(send);
|
||||
fuzzer.expect(expect,TimeUnit.SECONDS,16);
|
||||
fuzzer.expect(expect,16,TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -291,7 +289,7 @@ public class TestABCase9 extends AbstractABCase
|
|||
fuzzer.connect();
|
||||
fuzzer.setSendMode(Fuzzer.SendMode.BULK);
|
||||
fuzzer.send(send);
|
||||
fuzzer.expect(expect,TimeUnit.SECONDS,32);
|
||||
fuzzer.expect(expect,32,TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -372,7 +370,7 @@ public class TestABCase9 extends AbstractABCase
|
|||
fuzzer.connect();
|
||||
fuzzer.setSendMode(Fuzzer.SendMode.BULK);
|
||||
fuzzer.send(send);
|
||||
fuzzer.expect(expect,TimeUnit.SECONDS,4);
|
||||
fuzzer.expect(expect,4,TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -400,7 +398,7 @@ public class TestABCase9 extends AbstractABCase
|
|||
fuzzer.connect();
|
||||
fuzzer.setSendMode(Fuzzer.SendMode.BULK);
|
||||
fuzzer.send(send);
|
||||
fuzzer.expect(expect,TimeUnit.SECONDS,8);
|
||||
fuzzer.expect(expect,8,TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -428,7 +426,7 @@ public class TestABCase9 extends AbstractABCase
|
|||
fuzzer.connect();
|
||||
fuzzer.setSendMode(Fuzzer.SendMode.BULK);
|
||||
fuzzer.send(send);
|
||||
fuzzer.expect(expect,TimeUnit.SECONDS,16);
|
||||
fuzzer.expect(expect,16,TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -456,7 +454,7 @@ public class TestABCase9 extends AbstractABCase
|
|||
fuzzer.connect();
|
||||
fuzzer.setSendMode(Fuzzer.SendMode.BULK);
|
||||
fuzzer.send(send);
|
||||
fuzzer.expect(expect,TimeUnit.SECONDS,32);
|
||||
fuzzer.expect(expect,32,TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue