428232 - Rework batch mode / buffering in websocket.

Refactored OutgoingFrames.outgoingFrame() to take an additional
parameter, FlushMode. This is in preparation for handling this new
parameter in FrameFlusher.
This commit is contained in:
Simone Bordet 2014-02-17 16:57:50 +01:00
parent f092561a8a
commit 8e5c06b95c
35 changed files with 358 additions and 307 deletions

View File

@ -21,7 +21,6 @@ package org.eclipse.jetty.websocket.jsr356;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Future;
import javax.websocket.EncodeException;
import javax.websocket.Encoder;
import javax.websocket.RemoteEndpoint;
@ -30,6 +29,7 @@ import javax.websocket.SendHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.common.BlockingWriteCallback;
import org.eclipse.jetty.websocket.common.WebSocketRemoteEndpoint;
import org.eclipse.jetty.websocket.common.io.FutureWriteCallback;
import org.eclipse.jetty.websocket.common.message.MessageOutputStream;
@ -80,24 +80,31 @@ public abstract class AbstractJsrRemote implements RemoteEndpoint
@Override
public void flushBatch() throws IOException
{
// TODO Auto-generated method stub
BlockingWriteCallback callback = new BlockingWriteCallback();
jettyRemote.sendBytes(BufferUtil.EMPTY_BUFFER, callback);
callback.block();
}
@Override
public boolean getBatchingAllowed()
{
// TODO Auto-generated method stub
return false;
return session.isBatching();
}
@Override
public void setBatchingAllowed(boolean allowed) throws IOException
{
session.setBatching(allowed);
}
@SuppressWarnings(
{ "rawtypes", "unchecked" })
{"rawtypes", "unchecked"})
public Future<Void> sendObjectViaFuture(Object data)
{
assertMessageNotNull(data);
if (LOG.isDebugEnabled())
{
LOG.debug("sendObject({})",data);
LOG.debug("sendObject({})", data);
}
Encoder encoder = encoders.getEncoderFor(data.getClass());
@ -108,15 +115,15 @@ public abstract class AbstractJsrRemote implements RemoteEndpoint
if (encoder instanceof Encoder.Text)
{
Encoder.Text etxt = (Encoder.Text)encoder;
Encoder.Text text = (Encoder.Text)encoder;
try
{
String msg = etxt.encode(data);
String msg = text.encode(data);
return jettyRemote.sendStringByFuture(msg);
}
catch (EncodeException e)
{
return new EncodeFailedFuture(data,etxt,Encoder.Text.class,e);
return new EncodeFailedFuture(data, text, Encoder.Text.class, e);
}
}
else if (encoder instanceof Encoder.TextStream)
@ -126,12 +133,12 @@ public abstract class AbstractJsrRemote implements RemoteEndpoint
try (MessageWriter writer = new MessageWriter(session))
{
writer.setCallback(callback);
etxt.encode(data,writer);
etxt.encode(data, writer);
return callback;
}
catch (EncodeException | IOException e)
{
return new EncodeFailedFuture(data,etxt,Encoder.Text.class,e);
return new EncodeFailedFuture(data, etxt, Encoder.Text.class, e);
}
}
else if (encoder instanceof Encoder.Binary)
@ -144,7 +151,7 @@ public abstract class AbstractJsrRemote implements RemoteEndpoint
}
catch (EncodeException e)
{
return new EncodeFailedFuture(data,ebin,Encoder.Binary.class,e);
return new EncodeFailedFuture(data, ebin, Encoder.Binary.class, e);
}
}
else if (encoder instanceof Encoder.BinaryStream)
@ -154,12 +161,12 @@ public abstract class AbstractJsrRemote implements RemoteEndpoint
try (MessageOutputStream out = new MessageOutputStream(session))
{
out.setCallback(callback);
ebin.encode(data,out);
ebin.encode(data, out);
return callback;
}
catch (EncodeException | IOException e)
{
return new EncodeFailedFuture(data,ebin,Encoder.Binary.class,e);
return new EncodeFailedFuture(data, ebin, Encoder.Binary.class, e);
}
}
@ -171,7 +178,7 @@ public abstract class AbstractJsrRemote implements RemoteEndpoint
{
if (LOG.isDebugEnabled())
{
LOG.debug("sendPing({})",BufferUtil.toDetailString(data));
LOG.debug("sendPing({})", BufferUtil.toDetailString(data));
}
jettyRemote.sendPing(data);
}
@ -181,14 +188,8 @@ public abstract class AbstractJsrRemote implements RemoteEndpoint
{
if (LOG.isDebugEnabled())
{
LOG.debug("sendPong({})",BufferUtil.toDetailString(data));
LOG.debug("sendPong({})", BufferUtil.toDetailString(data));
}
jettyRemote.sendPong(data);
}
@Override
public void setBatchingAllowed(boolean allowed) throws IOException
{
// TODO Auto-generated method stub
}
}

View File

@ -29,7 +29,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import javax.websocket.CloseReason;
import javax.websocket.EndpointConfig;
import javax.websocket.Extension;
@ -72,8 +71,9 @@ public class JsrSession extends WebSocketSession implements javax.websocket.Sess
private Map<String, String> pathParameters = new HashMap<>();
private JsrAsyncRemote asyncRemote;
private JsrBasicRemote basicRemote;
private volatile boolean batching;
public JsrSession(URI requestURI, EventDriver websocket, LogicalConnection connection, ClientContainer container, String id, SessionListener[] sessionListeners)
public JsrSession(URI requestURI, EventDriver websocket, LogicalConnection connection, ClientContainer container, String id, SessionListener... sessionListeners)
{
super(requestURI,websocket,connection,sessionListeners);
if (!(websocket instanceof AbstractJsrEventDriver))
@ -374,4 +374,15 @@ public class JsrSession extends WebSocketSession implements javax.websocket.Sess
messageHandlerSet.add(wrapper.getHandler());
}
}
@Override
public boolean isBatching()
{
return batching;
}
public void setBatching(boolean batching)
{
this.batching = batching;
}
}

View File

@ -18,17 +18,13 @@
package org.eclipse.jetty.websocket.jsr356;
import static org.hamcrest.Matchers.instanceOf;
import java.net.URI;
import java.nio.ByteBuffer;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.DeploymentException;
import javax.websocket.MessageHandler;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.SessionListener;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.jsr356.client.EmptyClientEndpointConfig;
import org.eclipse.jetty.websocket.jsr356.client.SimpleEndpointMetadata;
@ -44,6 +40,8 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.hamcrest.Matchers.instanceOf;
public class JsrSessionTest
{
private ClientContainer container;
@ -65,7 +63,7 @@ public class JsrSessionTest
EventDriver driver = new JsrEndpointEventDriver(policy,ei);
DummyConnection connection = new DummyConnection();
session = new JsrSession(requestURI,driver,connection,container,id,new SessionListener[0]);
session = new JsrSession(requestURI,driver,connection,container,id);
}
@Test

View File

@ -123,7 +123,7 @@ public class DummyConnection implements LogicalConnection
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback)
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
{
}

View File

@ -126,7 +126,7 @@ public class DummyConnection implements LogicalConnection
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback)
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
{
callback.writeSuccess();
}

View File

@ -18,17 +18,13 @@
package org.eclipse.jetty.websocket.jsr356.server;
import static org.hamcrest.Matchers.*;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import javax.websocket.server.ServerEndpoint;
import javax.websocket.server.ServerEndpointConfig;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.SessionListener;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
@ -45,6 +41,9 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
public class OnPartialTest
{
@Rule
@ -80,7 +79,7 @@ public class OnPartialTest
DummyConnection connection = new DummyConnection();
ClientContainer container = new ClientContainer();
@SuppressWarnings("resource")
JsrSession session = new JsrSession(requestURI,driver,connection,container,id,new SessionListener[0]);
JsrSession session = new JsrSession(requestURI,driver,connection,container,id);
session.setPolicy(policy);
session.open();
return driver;

View File

@ -175,4 +175,9 @@ public interface Session extends Closeable
* @return the suspend token suitable for resuming the reading of data on the connection.
*/
SuspendToken suspend();
/**
* @return true if this session is batching network data, false if it flushes it immediately.
*/
boolean isBatching();
}

View File

@ -21,21 +21,41 @@ package org.eclipse.jetty.websocket.api.extensions;
import org.eclipse.jetty.websocket.api.WriteCallback;
/**
* Interface for dealing with frames outgoing to the network (eventually)
* Interface for dealing with frames outgoing to (eventually) the network layer.
*/
public interface OutgoingFrames
{
/**
* A frame, and optional callback, intended for the network.
* <p>
* Note: the frame can undergo many transformations in the various layers and extensions present in the implementation.
* <p>
* If you are implementing a mutation, you are obliged to handle the incoming WriteCallback appropriately.
*
* @param frame
* the frame to eventually write to the network.
* @param callback
* the optional callback to use for success/failure of the network write operation. Can be null.
* A frame, and optional callback, intended for the network layer.
* <p/>
* Note: the frame can undergo many transformations in the various
* layers and extensions present in the implementation.
* <p/>
* If you are implementing a mutation, you are obliged to handle
* the incoming WriteCallback appropriately.
*
* @param frame the frame to eventually write to the network layer.
* @param callback the callback to notify when the frame is written.
* @param flushMode the flush mode required by the sender.
*/
void outgoingFrame(Frame frame, WriteCallback callback);
void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode);
/**
* The possible flush modes when invoking {@link #outgoingFrame(Frame, org.eclipse.jetty.websocket.api.WriteCallback, org.eclipse.jetty.websocket.api.extensions.OutgoingFrames.FlushMode)}.
*/
public enum FlushMode
{
/**
* Implementers of {@link #outgoingFrame(Frame, org.eclipse.jetty.websocket.api.WriteCallback, org.eclipse.jetty.websocket.api.extensions.OutgoingFrames.FlushMode)}
* are free to decide whether to flush or not the given frame
* to the network layer.
*/
AUTO,
/**
* Implementers of {@link #outgoingFrame(Frame, org.eclipse.jetty.websocket.api.WriteCallback, org.eclipse.jetty.websocket.api.extensions.OutgoingFrames.FlushMode)}
* must flush the given frame to the network layer.
*/
FLUSH
}
}

View File

@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.ProtocolException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
@ -96,26 +95,16 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection
}
/**
* Overrride to set masker
* Override to set the masker.
*/
@Override
public void outgoingFrame(Frame frame, WriteCallback callback)
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
{
if (frame instanceof WebSocketFrame)
{
if (masker == null)
{
ProtocolException ex = new ProtocolException("Must set a Masker");
LOG.warn(ex);
if (callback != null)
{
callback.writeFailed(ex);
}
return;
}
masker.setMask((WebSocketFrame)frame);
}
super.outgoingFrame(frame,callback);
super.outgoingFrame(frame,callback, flushMode);
}
@Override

View File

@ -52,7 +52,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
STREAMING,
PARTIAL_TEXT,
PARTIAL_BINARY
};
}
private static final WriteCallback NOOP_CALLBACK = new WriteCallback()
{
@ -284,17 +284,16 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
}
}
/* ------------------------------------------------------------ */
/** unchecked send
* @param frame
* @param callback
*/
public void uncheckedSendFrame(WebSocketFrame frame, WriteCallback callback)
{
try
{
connection.getIOState().assertOutputOpen();
outgoing.outgoingFrame(frame,callback);
OutgoingFrames.FlushMode flushMode = OutgoingFrames.FlushMode.FLUSH;
WebSocketSession session = connection.getSession();
if (session != null && session.isBatching())
flushMode = OutgoingFrames.FlushMode.AUTO;
outgoing.outgoingFrame(frame,callback,flushMode);
}
catch (IOException e)
{

View File

@ -57,8 +57,9 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
private final URI requestURI;
private final EventDriver websocket;
private final LogicalConnection connection;
private final Executor executor;
private final boolean batching;
private final SessionListener[] sessionListeners;
private final Executor executor;
private ExtensionFactory extensionFactory;
private String protocolVersion;
private Map<String, String[]> parameterMap = new HashMap<>();
@ -69,7 +70,12 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
private UpgradeRequest upgradeRequest;
private UpgradeResponse upgradeResponse;
public WebSocketSession(URI requestURI, EventDriver websocket, LogicalConnection connection, SessionListener[] sessionListeners)
public WebSocketSession(URI requestURI, EventDriver websocket, LogicalConnection connection, SessionListener... sessionListeners)
{
this(requestURI, websocket, connection, true, sessionListeners);
}
public WebSocketSession(URI requestURI, EventDriver websocket, LogicalConnection connection, boolean batching, SessionListener... sessionListeners)
{
if (requestURI == null)
{
@ -79,6 +85,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
this.requestURI = requestURI;
this.websocket = websocket;
this.connection = connection;
this.batching = batching;
this.sessionListeners = sessionListeners;
this.executor = connection.getExecutor();
this.outgoingHandler = connection;
@ -471,6 +478,12 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
return connection;
}
@Override
public boolean isBatching()
{
return batching;
}
@Override
public String toString()
{

View File

@ -162,10 +162,10 @@ public abstract class AbstractExtension extends ContainerLifeCycle implements Ex
this.nextIncoming.incomingFrame(frame);
}
protected void nextOutgoingFrame(Frame frame, WriteCallback callback)
protected void nextOutgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
{
log.debug("nextOutgoingFrame({})",frame);
this.nextOutgoing.outgoingFrame(frame,callback);
this.nextOutgoing.outgoingFrame(frame,callback,flushMode);
}
public void setBufferPool(ByteBufferPool bufferPool)

View File

@ -273,9 +273,9 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback)
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
{
FrameEntry entry = new FrameEntry(frame, callback);
FrameEntry entry = new FrameEntry(frame, callback, flushMode);
LOG.debug("Queuing {}", entry);
entries.offer(entry);
flusher.iterate();
@ -344,11 +344,13 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames
{
private final Frame frame;
private final WriteCallback callback;
private final FlushMode flushMode;
private FrameEntry(Frame frame, WriteCallback callback)
private FrameEntry(Frame frame, WriteCallback callback, FlushMode flushMode)
{
this.frame = frame;
this.callback = callback;
this.flushMode = flushMode;
}
@Override
@ -369,7 +371,7 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames
LOG.debug("Processing {}", current);
if (current == null)
return Action.IDLE;
nextOutgoing.outgoingFrame(current.frame, this);
nextOutgoing.outgoingFrame(current.frame, this, current.flushMode);
return Action.SCHEDULED;
}

View File

@ -138,7 +138,7 @@ public abstract class CompressExtension extends AbstractExtension
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback)
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
{
// We use a queue and an IteratingCallback to handle concurrency.
// We must compress and write atomically, otherwise the compression
@ -150,7 +150,7 @@ public abstract class CompressExtension extends AbstractExtension
return;
}
FrameEntry entry = new FrameEntry(frame, callback);
FrameEntry entry = new FrameEntry(frame, callback, flushMode);
LOG.debug("Queuing {}", entry);
entries.offer(entry);
flusher.iterate();
@ -166,11 +166,13 @@ public abstract class CompressExtension extends AbstractExtension
{
private final Frame frame;
private final WriteCallback callback;
private final FlushMode flushMode;
private FrameEntry(Frame frame, WriteCallback callback)
private FrameEntry(Frame frame, WriteCallback callback, FlushMode flushMode)
{
this.frame = frame;
this.callback = callback;
this.flushMode = flushMode;
}
@Override
@ -199,7 +201,7 @@ public abstract class CompressExtension extends AbstractExtension
}
else
{
compress(current.frame, false);
compress(current, false);
}
return Action.SCHEDULED;
}
@ -207,31 +209,33 @@ public abstract class CompressExtension extends AbstractExtension
private void deflate(FrameEntry entry)
{
Frame frame = entry.frame;
FlushMode flushMode = entry.flushMode;
if (OpCode.isControlFrame(frame.getOpCode()))
{
// Skip, cannot compress control frames.
nextOutgoingFrame(frame, this);
nextOutgoingFrame(frame, this, flushMode);
return;
}
if (!frame.hasPayload())
{
// Pass through, nothing to do
nextOutgoingFrame(frame, this);
nextOutgoingFrame(frame, this, flushMode);
return;
}
compress(frame, true);
compress(entry, true);
}
private void compress(Frame frame, boolean first)
private void compress(FrameEntry entry, boolean first)
{
// Get a chunk of the payload to avoid to blow
// the heap if the payload is a huge mapped file.
Frame frame = entry.frame;
ByteBuffer data = frame.getPayload();
int remaining = data.remaining();
int inputLength = Math.min(remaining, 32 * 1024);
LOG.debug("Compressing {}: {} bytes in {} bytes chunk", frame, remaining, inputLength);
LOG.debug("Compressing {}: {} bytes in {} bytes chunk", entry, remaining, inputLength);
// Avoid to copy the bytes if the ByteBuffer
// is backed by an array.
@ -280,7 +284,7 @@ public abstract class CompressExtension extends AbstractExtension
// Skip the last tail bytes bytes generated by SYNC_FLUSH.
payload = ByteBuffer.wrap(output, 0, outputLength - TAIL_BYTES.length);
LOG.debug("Compressed {}: {}->{} chunk bytes", frame, inputLength, outputLength);
LOG.debug("Compressed {}: {}->{} chunk bytes", entry, inputLength, outputLength);
boolean continuation = frame.getType().isContinuation() || !first;
DataFrame chunk = new DataFrame(frame, continuation);
@ -289,7 +293,7 @@ public abstract class CompressExtension extends AbstractExtension
boolean fin = frame.isFin() && finished;
chunk.setFin(fin);
nextOutgoingFrame(chunk, this);
nextOutgoingFrame(chunk, this, entry.flushMode);
}
@Override

View File

@ -92,14 +92,14 @@ public class PerMessageDeflateExtension extends CompressExtension
}
@Override
protected void nextOutgoingFrame(Frame frame, WriteCallback callback)
protected void nextOutgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
{
if (frame.isFin() && !outgoingContextTakeover)
{
LOG.debug("Outgoing Context Reset");
getDeflater().reset();
}
super.nextOutgoingFrame(frame, callback);
super.nextOutgoingFrame(frame, callback, flushMode);
}
@Override

View File

@ -57,17 +57,17 @@ public class FragmentExtension extends AbstractExtension
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback)
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
{
ByteBuffer payload = frame.getPayload();
int length = payload != null ? payload.remaining() : 0;
if (OpCode.isControlFrame(frame.getOpCode()) || maxLength <= 0 || length <= maxLength)
{
nextOutgoingFrame(frame, callback);
nextOutgoingFrame(frame, callback, flushMode);
return;
}
FrameEntry entry = new FrameEntry(frame, callback);
FrameEntry entry = new FrameEntry(frame, callback, flushMode);
LOG.debug("Queuing {}", entry);
entries.offer(entry);
flusher.iterate();
@ -84,11 +84,13 @@ public class FragmentExtension extends AbstractExtension
{
private final Frame frame;
private final WriteCallback callback;
private final FlushMode flushMode;
private FrameEntry(Frame frame, WriteCallback callback)
private FrameEntry(Frame frame, WriteCallback callback, FlushMode flushMode)
{
this.frame = frame;
this.callback = callback;
this.flushMode = flushMode;
}
@Override
@ -143,7 +145,7 @@ public class FragmentExtension extends AbstractExtension
LOG.debug("Fragmented {}->{}", frame, fragment);
payload.position(newLimit);
nextOutgoingFrame(fragment, this);
nextOutgoingFrame(fragment, this, entry.flushMode);
}
@Override

View File

@ -56,10 +56,10 @@ public class IdentityExtension extends AbstractExtension
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback)
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
{
// pass through
nextOutgoingFrame(frame,callback);
nextOutgoingFrame(frame,callback, flushMode);
}
@Override

View File

@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
@ -55,7 +56,8 @@ import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
/**
* Provides the implementation of {@link LogicalConnection} within the framework of the new {@link Connection} framework of jetty-io
* Provides the implementation of {@link LogicalConnection} within the
* framework of the new {@link Connection} framework of {@code jetty-io}.
*/
public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, ConnectionStateListener
{
@ -379,7 +381,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{
// Fire out a close frame, indicating abnormal shutdown, then disconnect
CloseInfo abnormal = new CloseInfo(StatusCode.SHUTDOWN,"Abnormal Close - " + ioState.getCloseInfo().getReason());
outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback());
outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback(),FlushMode.FLUSH);
}
else
{
@ -390,7 +392,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
case CLOSING:
CloseInfo close = ioState.getCloseInfo();
// append close frame
outgoingFrame(close.asFrame(),new OnDisconnectCallback());
outgoingFrame(close.asFrame(),new OnDisconnectCallback(),FlushMode.FLUSH);
default:
break;
}
@ -463,14 +465,14 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
* Frame from API, User, or Internal implementation destined for network.
*/
@Override
public void outgoingFrame(Frame frame, WriteCallback callback)
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
{
if (LOG.isDebugEnabled())
{
LOG.debug("outgoingFrame({}, {})",frame,callback);
}
flusher.enqueue(frame,callback);
flusher.enqueue(frame,callback,flushMode);
}
private int read(ByteBuffer buffer)

View File

@ -28,11 +28,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.OpCode;
@ -131,10 +133,10 @@ public class FrameFlusher
}
}
public void enqueue(Frame frame, WriteCallback callback)
public void enqueue(Frame frame, WriteCallback callback, OutgoingFrames.FlushMode flushMode)
{
Objects.requireNonNull(frame);
FrameEntry entry = new FrameEntry(frame,callback);
FrameEntry entry = new FrameEntry(frame,callback,flushMode);
LOG.debug("enqueue({})",entry);
Throwable failure=null;
synchronized (lock)
@ -292,14 +294,16 @@ public class FrameFlusher
{
protected final AtomicBoolean failed = new AtomicBoolean(false);
protected final Frame frame;
private final OutgoingFrames.FlushMode flushMode;
protected final WriteCallback callback;
/** holds reference to header ByteBuffer, as it needs to be released on success/failure */
private ByteBuffer headerBuffer;
public FrameEntry(Frame frame, WriteCallback callback)
public FrameEntry(Frame frame, WriteCallback callback, OutgoingFrames.FlushMode flushMode)
{
this.frame = frame;
this.callback = callback;
this.flushMode = flushMode;
}
public ByteBuffer getHeaderBytes()

View File

@ -43,7 +43,7 @@ public class FramePipes
@Override
public void incomingFrame(Frame frame)
{
this.outgoing.outgoingFrame(frame,null);
this.outgoing.outgoingFrame(frame,null,OutgoingFrames.FlushMode.FLUSH);
}
}
@ -57,7 +57,7 @@ public class FramePipes
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback)
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
{
try
{

View File

@ -137,7 +137,7 @@ public class MessageOutputStream extends OutputStream
try
{
outgoing.outgoingFrame(frame,blocker);
outgoing.outgoingFrame(frame,blocker,OutgoingFrames.FlushMode.FLUSH);
// block on write
blocker.block();
// block success

View File

@ -118,7 +118,7 @@ public class MessageWriter extends Writer
try
{
outgoing.outgoingFrame(frame,blocker);
outgoing.outgoingFrame(frame,blocker,OutgoingFrames.FlushMode.FLUSH);
// block on write
blocker.block();
// write success

View File

@ -18,8 +18,6 @@
package org.eclipse.jetty.websocket.common;
import static org.hamcrest.Matchers.containsString;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -32,6 +30,8 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import static org.hamcrest.Matchers.containsString;
public class WebSocketRemoteEndpointTest
{
@Rule

View File

@ -44,7 +44,7 @@ public class DummyOutgoingFrames implements OutgoingFrames
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback)
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
{
LOG.debug("outgoingFrame({},{})",frame,callback);
if (callback != null)

View File

@ -18,8 +18,6 @@
package org.eclipse.jetty.websocket.common.extensions;
import static org.hamcrest.Matchers.*;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
@ -32,6 +30,7 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.fragment.FragmentExtension;
@ -46,10 +45,12 @@ import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
public class FragmentExtensionTest
{
@Rule
public LeakTrackingBufferPool bufferPool = new LeakTrackingBufferPool("Test",new MappedByteBufferPool());
public LeakTrackingBufferPool bufferPool = new LeakTrackingBufferPool("Test", new MappedByteBufferPool());
/**
* Verify that incoming frames are passed thru without modification
@ -82,7 +83,7 @@ public class FragmentExtensionTest
int len = quote.size();
capture.assertFrameCount(len);
capture.assertHasFrame(OpCode.TEXT,len);
capture.assertHasFrame(OpCode.TEXT, len);
String prefix;
int i = 0;
@ -90,15 +91,15 @@ public class FragmentExtensionTest
{
prefix = "Frame[" + i + "]";
Assert.assertThat(prefix + ".opcode",actual.getOpCode(),is(OpCode.TEXT));
Assert.assertThat(prefix + ".fin",actual.isFin(),is(true));
Assert.assertThat(prefix + ".rsv1",actual.isRsv1(),is(false));
Assert.assertThat(prefix + ".rsv2",actual.isRsv2(),is(false));
Assert.assertThat(prefix + ".rsv3",actual.isRsv3(),is(false));
Assert.assertThat(prefix + ".opcode", actual.getOpCode(), is(OpCode.TEXT));
Assert.assertThat(prefix + ".fin", actual.isFin(), is(true));
Assert.assertThat(prefix + ".rsv1", actual.isRsv1(), is(false));
Assert.assertThat(prefix + ".rsv2", actual.isRsv2(), is(false));
Assert.assertThat(prefix + ".rsv3", actual.isRsv3(), is(false));
ByteBuffer expected = BufferUtil.toBuffer(quote.get(i),StandardCharsets.UTF_8);
Assert.assertThat(prefix + ".payloadLength",actual.getPayloadLength(),is(expected.remaining()));
ByteBufferAssert.assertEquals(prefix + ".payload",expected,actual.getPayload().slice());
ByteBuffer expected = BufferUtil.toBuffer(quote.get(i), StandardCharsets.UTF_8);
Assert.assertThat(prefix + ".payloadLength", actual.getPayloadLength(), is(expected.remaining()));
ByteBufferAssert.assertEquals(prefix + ".payload", expected, actual.getPayload().slice());
i++;
}
}
@ -124,18 +125,18 @@ public class FragmentExtensionTest
ext.incomingFrame(ping);
capture.assertFrameCount(1);
capture.assertHasFrame(OpCode.PING,1);
capture.assertHasFrame(OpCode.PING, 1);
WebSocketFrame actual = capture.getFrames().poll();
Assert.assertThat("Frame.opcode",actual.getOpCode(),is(OpCode.PING));
Assert.assertThat("Frame.fin",actual.isFin(),is(true));
Assert.assertThat("Frame.rsv1",actual.isRsv1(),is(false));
Assert.assertThat("Frame.rsv2",actual.isRsv2(),is(false));
Assert.assertThat("Frame.rsv3",actual.isRsv3(),is(false));
Assert.assertThat("Frame.opcode", actual.getOpCode(), is(OpCode.PING));
Assert.assertThat("Frame.fin", actual.isFin(), is(true));
Assert.assertThat("Frame.rsv1", actual.isRsv1(), is(false));
Assert.assertThat("Frame.rsv2", actual.isRsv2(), is(false));
Assert.assertThat("Frame.rsv3", actual.isRsv3(), is(false));
ByteBuffer expected = BufferUtil.toBuffer(payload,StandardCharsets.UTF_8);
Assert.assertThat("Frame.payloadLength",actual.getPayloadLength(),is(expected.remaining()));
ByteBufferAssert.assertEquals("Frame.payload",expected,actual.getPayload().slice());
ByteBuffer expected = BufferUtil.toBuffer(payload, StandardCharsets.UTF_8);
Assert.assertThat("Frame.payloadLength", actual.getPayloadLength(), is(expected.remaining()));
ByteBufferAssert.assertEquals("Frame.payload", expected, actual.getPayload().slice());
}
/**
@ -164,7 +165,7 @@ public class FragmentExtensionTest
for (String section : quote)
{
Frame frame = new TextFrame().setPayload(section);
ext.outgoingFrame(frame,null);
ext.outgoingFrame(frame, null, OutgoingFrames.FlushMode.FLUSH);
}
// Expected Frames
@ -195,18 +196,18 @@ public class FragmentExtensionTest
// System.out.printf("expect: %s%n",expectedFrame);
// Validate Frame
Assert.assertThat(prefix + ".opcode",actualFrame.getOpCode(),is(expectedFrame.getOpCode()));
Assert.assertThat(prefix + ".fin",actualFrame.isFin(),is(expectedFrame.isFin()));
Assert.assertThat(prefix + ".rsv1",actualFrame.isRsv1(),is(expectedFrame.isRsv1()));
Assert.assertThat(prefix + ".rsv2",actualFrame.isRsv2(),is(expectedFrame.isRsv2()));
Assert.assertThat(prefix + ".rsv3",actualFrame.isRsv3(),is(expectedFrame.isRsv3()));
Assert.assertThat(prefix + ".opcode", actualFrame.getOpCode(), is(expectedFrame.getOpCode()));
Assert.assertThat(prefix + ".fin", actualFrame.isFin(), is(expectedFrame.isFin()));
Assert.assertThat(prefix + ".rsv1", actualFrame.isRsv1(), is(expectedFrame.isRsv1()));
Assert.assertThat(prefix + ".rsv2", actualFrame.isRsv2(), is(expectedFrame.isRsv2()));
Assert.assertThat(prefix + ".rsv3", actualFrame.isRsv3(), is(expectedFrame.isRsv3()));
// Validate Payload
ByteBuffer expectedData = expectedFrame.getPayload().slice();
ByteBuffer actualData = actualFrame.getPayload().slice();
Assert.assertThat(prefix + ".payloadLength",actualData.remaining(),is(expectedData.remaining()));
ByteBufferAssert.assertEquals(prefix + ".payload",expectedData,actualData);
Assert.assertThat(prefix + ".payloadLength", actualData.remaining(), is(expectedData.remaining()));
ByteBufferAssert.assertEquals(prefix + ".payload", expectedData, actualData);
}
}
@ -236,7 +237,7 @@ public class FragmentExtensionTest
for (String section : quote)
{
Frame frame = new TextFrame().setPayload(section);
ext.outgoingFrame(frame,null);
ext.outgoingFrame(frame, null, OutgoingFrames.FlushMode.FLUSH);
}
// Expected Frames
@ -259,18 +260,18 @@ public class FragmentExtensionTest
WebSocketFrame expectedFrame = expectedFrames.get(i);
// Validate Frame
Assert.assertThat(prefix + ".opcode",actualFrame.getOpCode(),is(expectedFrame.getOpCode()));
Assert.assertThat(prefix + ".fin",actualFrame.isFin(),is(expectedFrame.isFin()));
Assert.assertThat(prefix + ".rsv1",actualFrame.isRsv1(),is(expectedFrame.isRsv1()));
Assert.assertThat(prefix + ".rsv2",actualFrame.isRsv2(),is(expectedFrame.isRsv2()));
Assert.assertThat(prefix + ".rsv3",actualFrame.isRsv3(),is(expectedFrame.isRsv3()));
Assert.assertThat(prefix + ".opcode", actualFrame.getOpCode(), is(expectedFrame.getOpCode()));
Assert.assertThat(prefix + ".fin", actualFrame.isFin(), is(expectedFrame.isFin()));
Assert.assertThat(prefix + ".rsv1", actualFrame.isRsv1(), is(expectedFrame.isRsv1()));
Assert.assertThat(prefix + ".rsv2", actualFrame.isRsv2(), is(expectedFrame.isRsv2()));
Assert.assertThat(prefix + ".rsv3", actualFrame.isRsv3(), is(expectedFrame.isRsv3()));
// Validate Payload
ByteBuffer expectedData = expectedFrame.getPayload().slice();
ByteBuffer actualData = actualFrame.getPayload().slice();
Assert.assertThat(prefix + ".payloadLength",actualData.remaining(),is(expectedData.remaining()));
ByteBufferAssert.assertEquals(prefix + ".payload",expectedData,actualData);
Assert.assertThat(prefix + ".payloadLength", actualData.remaining(), is(expectedData.remaining()));
ByteBufferAssert.assertEquals(prefix + ".payload", expectedData, actualData);
}
}
@ -293,21 +294,21 @@ public class FragmentExtensionTest
String payload = "Are you there?";
Frame ping = new PingFrame().setPayload(payload);
ext.outgoingFrame(ping,null);
ext.outgoingFrame(ping, null, OutgoingFrames.FlushMode.FLUSH);
capture.assertFrameCount(1);
capture.assertHasFrame(OpCode.PING,1);
capture.assertHasFrame(OpCode.PING, 1);
WebSocketFrame actual = capture.getFrames().getFirst();
Assert.assertThat("Frame.opcode",actual.getOpCode(),is(OpCode.PING));
Assert.assertThat("Frame.fin",actual.isFin(),is(true));
Assert.assertThat("Frame.rsv1",actual.isRsv1(),is(false));
Assert.assertThat("Frame.rsv2",actual.isRsv2(),is(false));
Assert.assertThat("Frame.rsv3",actual.isRsv3(),is(false));
Assert.assertThat("Frame.opcode", actual.getOpCode(), is(OpCode.PING));
Assert.assertThat("Frame.fin", actual.isFin(), is(true));
Assert.assertThat("Frame.rsv1", actual.isRsv1(), is(false));
Assert.assertThat("Frame.rsv2", actual.isRsv2(), is(false));
Assert.assertThat("Frame.rsv3", actual.isRsv3(), is(false));
ByteBuffer expected = BufferUtil.toBuffer(payload,StandardCharsets.UTF_8);
Assert.assertThat("Frame.payloadLength",actual.getPayloadLength(),is(expected.remaining()));
ByteBufferAssert.assertEquals("Frame.payload",expected,actual.getPayload().slice());
ByteBuffer expected = BufferUtil.toBuffer(payload, StandardCharsets.UTF_8);
Assert.assertThat("Frame.payloadLength", actual.getPayloadLength(), is(expected.remaining()));
ByteBufferAssert.assertEquals("Frame.payload", expected, actual.getPayload().slice());
}
}

View File

@ -18,8 +18,6 @@
package org.eclipse.jetty.websocket.common.extensions;
import static org.hamcrest.Matchers.is;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
@ -27,6 +25,7 @@ import java.nio.charset.StandardCharsets;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.extensions.Extension;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.identity.IdentityExtension;
@ -37,6 +36,8 @@ import org.eclipse.jetty.websocket.common.test.OutgoingFramesCapture;
import org.junit.Assert;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
public class IdentityExtensionTest
{
/**
@ -54,18 +55,18 @@ public class IdentityExtensionTest
ext.incomingFrame(frame);
capture.assertFrameCount(1);
capture.assertHasFrame(OpCode.TEXT,1);
capture.assertHasFrame(OpCode.TEXT, 1);
WebSocketFrame actual = capture.getFrames().poll();
Assert.assertThat("Frame.opcode",actual.getOpCode(),is(OpCode.TEXT));
Assert.assertThat("Frame.fin",actual.isFin(),is(true));
Assert.assertThat("Frame.rsv1",actual.isRsv1(),is(false));
Assert.assertThat("Frame.rsv2",actual.isRsv2(),is(false));
Assert.assertThat("Frame.rsv3",actual.isRsv3(),is(false));
Assert.assertThat("Frame.opcode", actual.getOpCode(), is(OpCode.TEXT));
Assert.assertThat("Frame.fin", actual.isFin(), is(true));
Assert.assertThat("Frame.rsv1", actual.isRsv1(), is(false));
Assert.assertThat("Frame.rsv2", actual.isRsv2(), is(false));
Assert.assertThat("Frame.rsv3", actual.isRsv3(), is(false));
ByteBuffer expected = BufferUtil.toBuffer("hello",StandardCharsets.UTF_8);
Assert.assertThat("Frame.payloadLength",actual.getPayloadLength(),is(expected.remaining()));
ByteBufferAssert.assertEquals("Frame.payload",expected,actual.getPayload().slice());
ByteBuffer expected = BufferUtil.toBuffer("hello", StandardCharsets.UTF_8);
Assert.assertThat("Frame.payloadLength", actual.getPayloadLength(), is(expected.remaining()));
ByteBufferAssert.assertEquals("Frame.payload", expected, actual.getPayload().slice());
}
/**
@ -80,21 +81,21 @@ public class IdentityExtensionTest
ext.setNextOutgoingFrames(capture);
Frame frame = new TextFrame().setPayload("hello");
ext.outgoingFrame(frame,null);
ext.outgoingFrame(frame, null, OutgoingFrames.FlushMode.FLUSH);
capture.assertFrameCount(1);
capture.assertHasFrame(OpCode.TEXT,1);
capture.assertHasFrame(OpCode.TEXT, 1);
WebSocketFrame actual = capture.getFrames().getFirst();
Assert.assertThat("Frame.opcode",actual.getOpCode(),is(OpCode.TEXT));
Assert.assertThat("Frame.fin",actual.isFin(),is(true));
Assert.assertThat("Frame.rsv1",actual.isRsv1(),is(false));
Assert.assertThat("Frame.rsv2",actual.isRsv2(),is(false));
Assert.assertThat("Frame.rsv3",actual.isRsv3(),is(false));
Assert.assertThat("Frame.opcode", actual.getOpCode(), is(OpCode.TEXT));
Assert.assertThat("Frame.fin", actual.isFin(), is(true));
Assert.assertThat("Frame.rsv1", actual.isRsv1(), is(false));
Assert.assertThat("Frame.rsv2", actual.isRsv2(), is(false));
Assert.assertThat("Frame.rsv3", actual.isRsv3(), is(false));
ByteBuffer expected = BufferUtil.toBuffer("hello",StandardCharsets.UTF_8);
Assert.assertThat("Frame.payloadLength",actual.getPayloadLength(),is(expected.remaining()));
ByteBufferAssert.assertEquals("Frame.payload",expected,actual.getPayload().slice());
ByteBuffer expected = BufferUtil.toBuffer("hello", StandardCharsets.UTF_8);
Assert.assertThat("Frame.payloadLength", actual.getPayloadLength(), is(expected.remaining()));
ByteBufferAssert.assertEquals("Frame.payload", expected, actual.getPayload().slice());
}
}

View File

@ -31,7 +31,7 @@ public class CapturedHexPayloads implements OutgoingFrames
private List<String> captured = new ArrayList<>();
@Override
public void outgoingFrame(Frame frame, WriteCallback callback)
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
{
String hexPayload = Hex.asHex(frame.getPayload());
captured.add(hexPayload);

View File

@ -63,8 +63,8 @@ import static org.hamcrest.Matchers.is;
public class DeflateFrameExtensionTest extends AbstractExtensionTest
{
@Rule
public LeakTrackingBufferPool bufferPool = new LeakTrackingBufferPool("Test",new MappedByteBufferPool());
public LeakTrackingBufferPool bufferPool = new LeakTrackingBufferPool("Test", new MappedByteBufferPool());
private void assertIncoming(byte[] raw, String... expectedTextDatas)
{
WebSocketPolicy policy = WebSocketPolicy.newClientPolicy();
@ -90,21 +90,21 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
int len = expectedTextDatas.length;
capture.assertFrameCount(len);
capture.assertHasFrame(OpCode.TEXT,len);
capture.assertHasFrame(OpCode.TEXT, len);
int i=0;
for (WebSocketFrame actual: capture.getFrames())
int i = 0;
for (WebSocketFrame actual : capture.getFrames())
{
String prefix = "Frame[" + i + "]";
Assert.assertThat(prefix + ".opcode",actual.getOpCode(),is(OpCode.TEXT));
Assert.assertThat(prefix + ".fin",actual.isFin(),is(true));
Assert.assertThat(prefix + ".rsv1",actual.isRsv1(),is(false)); // RSV1 should be unset at this point
Assert.assertThat(prefix + ".rsv2",actual.isRsv2(),is(false));
Assert.assertThat(prefix + ".rsv3",actual.isRsv3(),is(false));
Assert.assertThat(prefix + ".opcode", actual.getOpCode(), is(OpCode.TEXT));
Assert.assertThat(prefix + ".fin", actual.isFin(), is(true));
Assert.assertThat(prefix + ".rsv1", actual.isRsv1(), is(false)); // RSV1 should be unset at this point
Assert.assertThat(prefix + ".rsv2", actual.isRsv2(), is(false));
Assert.assertThat(prefix + ".rsv3", actual.isRsv3(), is(false));
ByteBuffer expected = BufferUtil.toBuffer(expectedTextDatas[i],StandardCharsets.UTF_8);
Assert.assertThat(prefix + ".payloadLength",actual.getPayloadLength(),is(expected.remaining()));
ByteBufferAssert.assertEquals(prefix + ".payload",expected,actual.getPayload().slice());
ByteBuffer expected = BufferUtil.toBuffer(expectedTextDatas[i], StandardCharsets.UTF_8);
Assert.assertThat(prefix + ".payloadLength", actual.getPayloadLength(), is(expected.remaining()));
ByteBufferAssert.assertEquals(prefix + ".payload", expected, actual.getPayload().slice());
i++;
}
}
@ -120,15 +120,14 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
ExtensionConfig config = ExtensionConfig.parse("deflate-frame");
ext.setConfig(config);
boolean validating = true;
Generator generator = new Generator(policy,bufferPool,validating);
Generator generator = new Generator(policy, bufferPool, true);
generator.configureFromExtensions(Collections.singletonList(ext));
OutgoingNetworkBytesCapture capture = new OutgoingNetworkBytesCapture(generator);
ext.setNextOutgoingFrames(capture);
Frame frame = new TextFrame().setPayload(text);
ext.outgoingFrame(frame, null);
ext.outgoingFrame(frame, null, OutgoingFrames.FlushMode.FLUSH);
capture.assertBytes(0, expectedHex);
}
@ -143,9 +142,9 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
tester.parseIncomingHex(// Captured from Blockhead Client - "Hello" then "There" via unit test
"c18700000000f248cdc9c90700", // "Hello"
"c187000000000ac9482d4a0500" // "There"
);
);
tester.assertHasFrames("Hello","There");
tester.assertHasFrames("Hello", "There");
}
@Test
@ -157,7 +156,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
tester.parseIncomingHex(// Captured from Chrome 20.x - "Hello" (sent from browser)
"c187832b5c11716391d84a2c5c" // "Hello"
);
);
tester.assertHasFrames("Hello");
}
@ -172,9 +171,9 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
tester.parseIncomingHex(// Captured from Chrome 20.x - "Hello" then "There" (sent from browser)
"c1877b1971db8951bc12b21e71", // "Hello"
"c18759edc8f4532480d913e8c8" // There
);
);
tester.assertHasFrames("Hello","There");
tester.assertHasFrames("Hello", "There");
}
@Test
@ -186,7 +185,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
tester.parseIncomingHex(// Captured from Chrome 20.x - "info:" (sent from browser)
"c187ca4def7f0081a4b47d4fef" // example payload
);
);
tester.assertHasFrames("info:");
}
@ -201,9 +200,9 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
tester.parseIncomingHex(// Captured from Chrome 20.x - "time:" then "time:" once more (sent from browser)
"c18782467424a88fb869374474", // "time:"
"c1853cfda17f16fcb07f3c" // "time:"
);
);
tester.assertHasFrames("time:","time:");
tester.assertHasFrames("time:", "time:");
}
@Test
@ -217,9 +216,9 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
"c1876b100104" + "41d9cd49de1201", // "time:"
"c1852ae3ff01" + "00e2ee012a", // "time:"
"c18435558caa" + "37468caa" // "time:"
);
);
tester.assertHasFrames("time:","time:","time:");
tester.assertHasFrames("time:", "time:", "time:");
}
@Test
@ -227,7 +226,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
{
// What pywebsocket produces for "time:", "time:", "time:"
String expected[] = new String[]
{ "2AC9CC4DB50200", "2A01110000", "02130000" };
{"2AC9CC4DB50200", "2A01110000", "02130000"};
// Lets see what we produce
CapturedHexPayloads capture = new CapturedHexPayloads();
@ -235,13 +234,13 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
init(ext);
ext.setNextOutgoingFrames(capture);
ext.outgoingFrame(new TextFrame().setPayload("time:"),null);
ext.outgoingFrame(new TextFrame().setPayload("time:"),null);
ext.outgoingFrame(new TextFrame().setPayload("time:"),null);
ext.outgoingFrame(new TextFrame().setPayload("time:"), null, OutgoingFrames.FlushMode.FLUSH);
ext.outgoingFrame(new TextFrame().setPayload("time:"), null, OutgoingFrames.FlushMode.FLUSH);
ext.outgoingFrame(new TextFrame().setPayload("time:"), null, OutgoingFrames.FlushMode.FLUSH);
List<String> actual = capture.getCaptured();
Assert.assertThat("Compressed Payloads",actual,contains(expected));
Assert.assertThat("Compressed Payloads", actual, contains(expected));
}
private void init(DeflateFrameExtension ext)
@ -254,8 +253,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
public void testDeflateBasics() throws Exception
{
// Setup deflater basics
boolean nowrap = true;
Deflater compressor = new Deflater(Deflater.BEST_COMPRESSION,nowrap);
Deflater compressor = new Deflater(Deflater.BEST_COMPRESSION, true);
compressor.setStrategy(Deflater.DEFAULT_STRATEGY);
// Text to compress
@ -264,7 +262,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
// Prime the compressor
compressor.reset();
compressor.setInput(uncompressed,0,uncompressed.length);
compressor.setInput(uncompressed, 0, uncompressed.length);
compressor.finish();
// Perform compression
@ -274,26 +272,24 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
while (!compressor.finished())
{
byte out[] = new byte[64];
int len = compressor.deflate(out,0,out.length,Deflater.SYNC_FLUSH);
int len = compressor.deflate(out, 0, out.length, Deflater.SYNC_FLUSH);
if (len > 0)
{
outbuf.put(out,0,len);
outbuf.put(out, 0, len);
}
}
compressor.end();
BufferUtil.flipToFlush(outbuf,0);
byte b0 = outbuf.get(0);
if ((b0 & 1) != 0)
{
outbuf.put(0,(b0 ^= 1));
}
BufferUtil.flipToFlush(outbuf, 0);
byte compressed[] = BufferUtil.toArray(outbuf);
// Clear the BFINAL bit that has been set by the compressor.end() call.
// In the real implementation we never end() the compressor.
compressed[0] &= 0xFE;
String actual = TypeUtil.toHexString(compressed);
String expected = "CaCc4bCbB70200"; // what pywebsocket produces
Assert.assertThat("Compressed data",actual,is(expected));
Assert.assertThat("Compressed data", actual, is(expected));
}
@Test
@ -306,17 +302,16 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
ext.setPolicy(policy);
ext.setConfig(new ExtensionConfig(ext.getName()));
boolean validating = true;
Generator generator = new Generator(policy,bufferPool,validating);
Generator generator = new Generator(policy, bufferPool, true);
generator.configureFromExtensions(Collections.singletonList(ext));
OutgoingNetworkBytesCapture capture = new OutgoingNetworkBytesCapture(generator);
ext.setNextOutgoingFrames(capture);
ext.outgoingFrame(new TextFrame().setPayload("Hello"),null);
ext.outgoingFrame(new TextFrame().setPayload("There"),null);
ext.outgoingFrame(new TextFrame().setPayload("Hello"), null, OutgoingFrames.FlushMode.FLUSH);
ext.outgoingFrame(new TextFrame().setPayload("There"), null, OutgoingFrames.FlushMode.FLUSH);
capture.assertBytes(0,"c107f248cdc9c90700");
capture.assertBytes(0, "c107f248cdc9c90700");
}
@Test
@ -328,15 +323,15 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
Inflater inflater = new Inflater(true);
inflater.reset();
inflater.setInput(rawbuf,0,rawbuf.length);
inflater.setInput(rawbuf, 0, rawbuf.length);
byte outbuf[] = new byte[64];
int len = inflater.inflate(outbuf);
inflater.end();
Assert.assertThat("Inflated length",len,greaterThan(4));
Assert.assertThat("Inflated length", len, greaterThan(4));
String actual = StringUtil.toUTF8String(outbuf,0,len);
Assert.assertThat("Inflated text",actual,is("info:"));
String actual = StringUtil.toUTF8String(outbuf, 0, len);
Assert.assertThat("Inflated text", actual, is("info:"));
}
@Test
@ -344,7 +339,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
{
// Captured from PyWebSocket - "Hello" (echo from server)
byte rawbuf[] = TypeUtil.fromHexString("c107f248cdc9c90700");
assertIncoming(rawbuf,"Hello");
assertIncoming(rawbuf, "Hello");
}
@Test
@ -353,7 +348,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
// Captured from PyWebSocket - Long Text (echo from server)
byte rawbuf[] = TypeUtil.fromHexString("c1421cca410a80300c44d1abccce9df7" + "f018298634d05631138ab7b7b8fdef1f" + "dc0282e2061d575a45f6f2686bab25e1"
+ "3fb7296fa02b5885eb3b0379c394f461" + "98cafd03");
assertIncoming(rawbuf,"It's a big enough umbrella but it's always me that ends up getting wet.");
assertIncoming(rawbuf, "It's a big enough umbrella but it's always me that ends up getting wet.");
}
@Test
@ -361,7 +356,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
{
// Captured from PyWebSocket - "stackoverflow" (echo from server)
byte rawbuf[] = TypeUtil.fromHexString("c10f2a2e494ccece2f4b2d4acbc92f0700");
assertIncoming(rawbuf,"stackoverflow");
assertIncoming(rawbuf, "stackoverflow");
}
/**
@ -370,7 +365,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
@Test
public void testServerGeneratedHello() throws IOException
{
assertOutgoing("Hello","c107f248cdc9c90700");
assertOutgoing("Hello", "c107f248cdc9c90700");
}
/**
@ -379,7 +374,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
@Test
public void testServerGeneratedThere() throws IOException
{
assertOutgoing("There","c1070ac9482d4a0500");
assertOutgoing("There", "c1070ac9482d4a0500");
}
@Test
@ -403,7 +398,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
clientExtension.setNextOutgoingFrames(new OutgoingFrames()
{
@Override
public void outgoingFrame(Frame frame, WriteCallback callback)
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
{
serverExtension.incomingFrame(frame);
callback.writeSuccess();
@ -435,7 +430,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
BinaryFrame frame = new BinaryFrame();
frame.setPayload(input);
frame.setFin(true);
clientExtension.outgoingFrame(frame, null);
clientExtension.outgoingFrame(frame, null, OutgoingFrames.FlushMode.FLUSH);
Assert.assertArrayEquals(input, result.toByteArray());
}

View File

@ -29,6 +29,7 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.AbstractExtensionTest;
@ -48,17 +49,17 @@ import static org.hamcrest.Matchers.is;
/**
* Client side behavioral tests for permessage-deflate extension.
* <p>
* <p/>
* See: http://tools.ietf.org/html/draft-ietf-hybi-permessage-compression-15
*/
public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
{
@Rule
public LeakTrackingBufferPool bufferPool = new LeakTrackingBufferPool("Test",new MappedByteBufferPool());
public LeakTrackingBufferPool bufferPool = new LeakTrackingBufferPool("Test", new MappedByteBufferPool());
/**
* Decode payload example as seen in draft-ietf-hybi-permessage-compression-15.
* <p>
* <p/>
* Section 8.2.3.4: Using a DEFLATE Block with BFINAL Set to 1
*/
@Test
@ -71,14 +72,14 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
tester.parseIncomingHex(// 1 message
"0xc1 0x08", // header
"0xf3 0x48 0xcd 0xc9 0xc9 0x07 0x00 0x00" // example payload
);
);
tester.assertHasFrames("Hello");
}
/**
* Decode payload example as seen in draft-ietf-hybi-permessage-compression-15.
* <p>
* <p/>
* Section 8.2.3.3: Using a DEFLATE Block with No Compression
*/
@Test
@ -90,14 +91,14 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
tester.parseIncomingHex(// 1 message / no compression
"0xc1 0x0b 0x00 0x05 0x00 0xfa 0xff 0x48 0x65 0x6c 0x6c 0x6f 0x00" // example frame
);
);
tester.assertHasFrames("Hello");
}
/**
* Decode payload example as seen in draft-ietf-hybi-permessage-compression-15.
* <p>
* <p/>
* Section 8.2.3.1: A message compressed using 1 compressed DEFLATE block
*/
@Test
@ -109,14 +110,14 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
tester.parseIncomingHex(//basic, 1 block, compressed with 0 compression level (aka, uncompressed).
"0xc1 0x07 0xf2 0x48 0xcd 0xc9 0xc9 0x07 0x00" // example frame
);
);
tester.assertHasFrames("Hello");
}
/**
* Decode payload example as seen in draft-ietf-hybi-permessage-compression-15.
* <p>
* <p/>
* Section 8.2.3.1: A message compressed using 1 compressed DEFLATE block (with fragmentation)
*/
@Test
@ -139,7 +140,7 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
/**
* Decode payload example as seen in draft-ietf-hybi-permessage-compression-15.
* <p>
* <p/>
* Section 8.2.3.2: Sharing LZ77 Sliding Window
*/
@Test
@ -157,12 +158,12 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
"0xc1 0x07", // (HEADER added for this test)
"0xf2 0x48 0xcd 0xc9 0xc9 0x07 0x00");
tester.assertHasFrames("Hello","Hello");
tester.assertHasFrames("Hello", "Hello");
}
/**
* Decode payload example as seen in draft-ietf-hybi-permessage-compression-15.
* <p>
* <p/>
* Section 8.2.3.2: Sharing LZ77 Sliding Window
*/
@Test
@ -179,14 +180,14 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
// message 2
"0xc1 0x05", // (HEADER added for this test)
"0xf2 0x00 0x11 0x00 0x00"
);
);
tester.assertHasFrames("Hello","Hello");
tester.assertHasFrames("Hello", "Hello");
}
/**
* Decode payload example as seen in draft-ietf-hybi-permessage-compression-15.
* <p>
* <p/>
* Section 8.2.3.5: Two DEFLATE Blocks in 1 Message
*/
@Test
@ -199,7 +200,7 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
tester.parseIncomingHex(// 1 message, 1 frame, 2 deflate blocks
"0xc1 0x0d", // (HEADER added for this test)
"0xf2 0x48 0x05 0x00 0x00 0x00 0xff 0xff 0xca 0xc9 0xc9 0x07 0x00"
);
);
tester.assertHasFrames("Hello");
}
@ -227,18 +228,18 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
ext.incomingFrame(ping);
capture.assertFrameCount(1);
capture.assertHasFrame(OpCode.PING,1);
capture.assertHasFrame(OpCode.PING, 1);
WebSocketFrame actual = capture.getFrames().poll();
Assert.assertThat("Frame.opcode",actual.getOpCode(),is(OpCode.PING));
Assert.assertThat("Frame.fin",actual.isFin(),is(true));
Assert.assertThat("Frame.rsv1",actual.isRsv1(),is(false));
Assert.assertThat("Frame.rsv2",actual.isRsv2(),is(false));
Assert.assertThat("Frame.rsv3",actual.isRsv3(),is(false));
Assert.assertThat("Frame.opcode", actual.getOpCode(), is(OpCode.PING));
Assert.assertThat("Frame.fin", actual.isFin(), is(true));
Assert.assertThat("Frame.rsv1", actual.isRsv1(), is(false));
Assert.assertThat("Frame.rsv2", actual.isRsv2(), is(false));
Assert.assertThat("Frame.rsv3", actual.isRsv3(), is(false));
ByteBuffer expected = BufferUtil.toBuffer(payload,StandardCharsets.UTF_8);
Assert.assertThat("Frame.payloadLength",actual.getPayloadLength(),is(expected.remaining()));
ByteBufferAssert.assertEquals("Frame.payload",expected,actual.getPayload().slice());
ByteBuffer expected = BufferUtil.toBuffer(payload, StandardCharsets.UTF_8);
Assert.assertThat("Frame.payloadLength", actual.getPayloadLength(), is(expected.remaining()));
ByteBufferAssert.assertEquals("Frame.payload", expected, actual.getPayload().slice());
}
/**
@ -275,7 +276,7 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
int len = quote.size();
capture.assertFrameCount(len);
capture.assertHasFrame(OpCode.TEXT,len);
capture.assertHasFrame(OpCode.TEXT, len);
String prefix;
int i = 0;
@ -283,15 +284,15 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
{
prefix = "Frame[" + i + "]";
Assert.assertThat(prefix + ".opcode",actual.getOpCode(),is(OpCode.TEXT));
Assert.assertThat(prefix + ".fin",actual.isFin(),is(true));
Assert.assertThat(prefix + ".rsv1",actual.isRsv1(),is(false));
Assert.assertThat(prefix + ".rsv2",actual.isRsv2(),is(false));
Assert.assertThat(prefix + ".rsv3",actual.isRsv3(),is(false));
Assert.assertThat(prefix + ".opcode", actual.getOpCode(), is(OpCode.TEXT));
Assert.assertThat(prefix + ".fin", actual.isFin(), is(true));
Assert.assertThat(prefix + ".rsv1", actual.isRsv1(), is(false));
Assert.assertThat(prefix + ".rsv2", actual.isRsv2(), is(false));
Assert.assertThat(prefix + ".rsv3", actual.isRsv3(), is(false));
ByteBuffer expected = BufferUtil.toBuffer(quote.get(i),StandardCharsets.UTF_8);
Assert.assertThat(prefix + ".payloadLength",actual.getPayloadLength(),is(expected.remaining()));
ByteBufferAssert.assertEquals(prefix + ".payload",expected,actual.getPayload().slice());
ByteBuffer expected = BufferUtil.toBuffer(quote.get(i), StandardCharsets.UTF_8);
Assert.assertThat(prefix + ".payloadLength", actual.getPayloadLength(), is(expected.remaining()));
ByteBufferAssert.assertEquals(prefix + ".payload", expected, actual.getPayload().slice());
i++;
}
}
@ -317,22 +318,22 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
String payload = "Are you there?";
Frame ping = new PingFrame().setPayload(payload);
ext.outgoingFrame(ping,null);
ext.outgoingFrame(ping, null, OutgoingFrames.FlushMode.FLUSH);
capture.assertFrameCount(1);
capture.assertHasFrame(OpCode.PING,1);
capture.assertHasFrame(OpCode.PING, 1);
WebSocketFrame actual = capture.getFrames().getFirst();
Assert.assertThat("Frame.opcode",actual.getOpCode(),is(OpCode.PING));
Assert.assertThat("Frame.fin",actual.isFin(),is(true));
Assert.assertThat("Frame.rsv1",actual.isRsv1(),is(false));
Assert.assertThat("Frame.rsv2",actual.isRsv2(),is(false));
Assert.assertThat("Frame.rsv3",actual.isRsv3(),is(false));
Assert.assertThat("Frame.opcode", actual.getOpCode(), is(OpCode.PING));
Assert.assertThat("Frame.fin", actual.isFin(), is(true));
Assert.assertThat("Frame.rsv1", actual.isRsv1(), is(false));
Assert.assertThat("Frame.rsv2", actual.isRsv2(), is(false));
Assert.assertThat("Frame.rsv3", actual.isRsv3(), is(false));
ByteBuffer expected = BufferUtil.toBuffer(payload,StandardCharsets.UTF_8);
Assert.assertThat("Frame.payloadLength",actual.getPayloadLength(),is(expected.remaining()));
ByteBufferAssert.assertEquals("Frame.payload",expected,actual.getPayload().slice());
ByteBuffer expected = BufferUtil.toBuffer(payload, StandardCharsets.UTF_8);
Assert.assertThat("Frame.payloadLength", actual.getPayloadLength(), is(expected.remaining()));
ByteBufferAssert.assertEquals("Frame.payload", expected, actual.getPayload().slice());
}
@Test
@ -350,7 +351,7 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
"c1 0b 0a c8 c8 c9 2f 4a 0c 01 62 00 00" // PhloraTora
);
tester.assertHasFrames("ToraTora","AtoraFlora","PhloraTora");
tester.assertHasFrames("ToraTora", "AtoraFlora", "PhloraTora");
}
@Test
@ -368,7 +369,7 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
"c1 04 02 61 00 00" // tora 3
);
tester.assertHasFrames("tora","tora","tora");
tester.assertHasFrames("tora", "tora", "tora");
}
@Test
@ -386,7 +387,7 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
"c1 8b e2 3e 05 53 e8 f6 cd 9a cd 74 09 52 80 3e 05" // PhloraTora
);
tester.assertHasFrames("ToraTora","AtoraFlora","PhloraTora");
tester.assertHasFrames("ToraTora", "AtoraFlora", "PhloraTora");
}
@Test
@ -404,6 +405,6 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
"c1 84 53 ad a5 34 51 cc a5 34" // tora 3
);
tester.assertHasFrames("tora","tora","tora");
tester.assertHasFrames("tora", "tora", "tora");
}
}

View File

@ -204,7 +204,7 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback)
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
{
}

View File

@ -21,7 +21,6 @@ package org.eclipse.jetty.websocket.common.io;
import java.net.URI;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.websocket.common.SessionListener;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.common.test.OutgoingFramesCapture;
@ -34,7 +33,7 @@ public class LocalWebSocketSession extends WebSocketSession
public LocalWebSocketSession(TestName testname, EventDriver driver, ByteBufferPool bufferPool)
{
super(URI.create("ws://localhost/LocalWebSocketSesssion/" + testname.getMethodName()),driver,new LocalWebSocketConnection(testname,bufferPool), new SessionListener[0]);
super(URI.create("ws://localhost/LocalWebSocketSesssion/" + testname.getMethodName()),driver,new LocalWebSocketConnection(testname,bufferPool));
this.id = testname.getMethodName();
outgoingCapture = new OutgoingFramesCapture();
setOutgoingHandler(outgoingCapture);

View File

@ -18,8 +18,6 @@
package org.eclipse.jetty.websocket.common.test;
import static org.hamcrest.Matchers.*;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
@ -69,6 +67,10 @@ import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser;
import org.junit.Assert;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
/**
* A simple websocket client for performing unit tests with.
* <p>
@ -465,7 +467,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback)
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
{
ByteBuffer headerBuf = generator.generateHeaderBytes(frame);
if (LOG.isDebugEnabled())
@ -710,7 +712,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti
{
frame.setMask(clientmask);
}
extensionStack.outgoingFrame(frame,null);
extensionStack.outgoingFrame(frame,null,FlushMode.FLUSH);
}
public void writeRaw(ByteBuffer buf) throws IOException

View File

@ -18,8 +18,6 @@
package org.eclipse.jetty.websocket.common.test;
import static org.hamcrest.Matchers.*;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
@ -66,6 +64,9 @@ import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
import org.eclipse.jetty.websocket.common.frames.CloseFrame;
import org.junit.Assert;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
/**
* A overly simplistic websocket server used during testing.
* <p>
@ -230,7 +231,7 @@ public class BlockheadServer
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback)
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
{
ByteBuffer headerBuf = generator.generateHeaderBytes(frame);
if (LOG.isDebugEnabled())
@ -560,7 +561,7 @@ public class BlockheadServer
public void write(Frame frame) throws IOException
{
LOG.debug("write(Frame->{}) to {}",frame,outgoing);
outgoing.outgoingFrame(frame,null);
outgoing.outgoingFrame(frame,null,FlushMode.FLUSH);
}
public void write(int b) throws IOException

View File

@ -18,8 +18,6 @@
package org.eclipse.jetty.websocket.common.test;
import static org.hamcrest.Matchers.*;
import java.util.LinkedList;
import org.eclipse.jetty.util.BufferUtil;
@ -30,6 +28,9 @@ import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.junit.Assert;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
public class OutgoingFramesCapture implements OutgoingFrames
{
private LinkedList<WebSocketFrame> frames = new LinkedList<>();
@ -84,7 +85,7 @@ public class OutgoingFramesCapture implements OutgoingFrames
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback)
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
{
frames.add(WebSocketFrame.copy(frame));
if (callback != null)

View File

@ -18,8 +18,6 @@
package org.eclipse.jetty.websocket.common.test;
import static org.hamcrest.Matchers.*;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@ -33,6 +31,9 @@ import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.Generator;
import org.junit.Assert;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
/**
* Capture outgoing network bytes.
*/
@ -61,7 +62,7 @@ public class OutgoingNetworkBytesCapture implements OutgoingFrames
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback)
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
{
ByteBuffer buf = ByteBuffer.allocate(Generator.OVERHEAD + frame.getPayloadLength());
generator.generateWholeFrame(frame,buf);