421697 - IteratingCallback improvements

Use the iteratingcallback for websocket
use gather writes for websocket
always write entire websocket payload
This commit is contained in:
Greg Wilkins 2013-11-21 15:11:56 +11:00
parent 1eb2997efd
commit 0a52c64d16
22 changed files with 479 additions and 1001 deletions

View File

@ -38,7 +38,7 @@ public class Flusher
private static final Logger LOG = Log.getLogger(Flusher.class); private static final Logger LOG = Log.getLogger(Flusher.class);
private static final int MAX_GATHER = 10; private static final int MAX_GATHER = 10;
private final IteratingCallback iteratingCallback = new SessionIteratingCallback(); private final FlusherCB flusherCB = new FlusherCB();
private final Controller controller; private final Controller controller;
private final Object lock = new Object(); private final Object lock = new Object();
private final ArrayQueue<StandardSession.FrameBytes> queue = new ArrayQueue<>(lock); private final ArrayQueue<StandardSession.FrameBytes> queue = new ArrayQueue<>(lock);
@ -124,7 +124,7 @@ public class Flusher
void flush() void flush()
{ {
iteratingCallback.iterate(); flusherCB.iterate();
} }
public int getQueueSize() public int getQueueSize()
@ -135,28 +135,25 @@ public class Flusher
} }
} }
private class SessionIteratingCallback extends IteratingCallback private class FlusherCB extends IteratingCallback
{ {
private final List<StandardSession.FrameBytes> active = new ArrayList<>(); // TODO should active and succeeded be local?
private final List<StandardSession.FrameBytes> active = new ArrayList<>(MAX_GATHER);
private final List<StandardSession.FrameBytes> succeeded = new ArrayList<>(MAX_GATHER);
private final Set<IStream> stalled = new HashSet<>(); private final Set<IStream> stalled = new HashSet<>();
@Override @Override
protected State process() throws Exception protected State process() throws Exception
{ {
StandardSession.FrameBytes frameBytes = null;
synchronized (lock) synchronized (lock)
{ {
if (active.size()>0) succeeded.clear();
throw new IllegalStateException();
if (queue.isEmpty())
return State.IDLE;
// Scan queue for data to write from first non stalled stream. // Scan queue for data to write from first non stalled stream.
int qs=queue.size(); int qs=queue.size();
for (int i = 0; i < qs && active.size()<MAX_GATHER;) for (int i = 0; i < qs && active.size()<MAX_GATHER;)
{ {
frameBytes = queue.getUnsafe(i); StandardSession.FrameBytes frameBytes = queue.getUnsafe(i);
IStream stream = frameBytes.getStream(); IStream stream = frameBytes.getStream();
// Continue if this is stalled stream // Continue if this is stalled stream
@ -205,38 +202,36 @@ public class Flusher
buffers[i]=active.get(i).getByteBuffer(); buffers[i]=active.get(i).getByteBuffer();
if (controller != null) if (controller != null)
controller.write(iteratingCallback, buffers); controller.write(flusherCB, buffers);
return State.SCHEDULED; return State.SCHEDULED;
} }
@Override @Override
protected void completed() protected void completed()
{ {
// will never be called as doProcess always returns WAITING or IDLE // will never be called as process always returns SCHEDULED or IDLE
throw new IllegalStateException(); throw new IllegalStateException();
} }
@Override @Override
public void succeeded() public void succeeded()
{ {
if (LOG.isDebugEnabled()) synchronized (lock)
{ {
synchronized (lock) if (LOG.isDebugEnabled())
{ LOG.debug("Succeeded write of {}, q={}", active, queue.size());
LOG.debug("Completed write of {}, {} frame(s) in queue", active, queue.size()); succeeded.addAll(active);
} active.clear();
} }
for (FrameBytes frame: active) for (FrameBytes frame: succeeded)
frame.succeeded(); frame.succeeded(); // TODO should we try catch?
active.clear();
super.succeeded(); super.succeeded();
} }
@Override @Override
public void failed(Throwable x) public void failed(Throwable x)
{ {
List<StandardSession.FrameBytes> frameBytesToFail = new ArrayList<>(); List<StandardSession.FrameBytes> failed = new ArrayList<>();
synchronized (lock) synchronized (lock)
{ {
failure = x; failure = x;
@ -245,15 +240,13 @@ public class Flusher
String logMessage = String.format("Failed write of %s, failing all %d frame(s) in queue", this, queue.size()); String logMessage = String.format("Failed write of %s, failing all %d frame(s) in queue", this, queue.size());
LOG.debug(logMessage, x); LOG.debug(logMessage, x);
} }
frameBytesToFail.addAll(queue); failed.addAll(active);
active.clear();
failed.addAll(queue);
queue.clear(); queue.clear();
} }
for (StandardSession.FrameBytes fb : failed)
for (FrameBytes frame: active) fb.failed(x); // TODO should we try catch?
frame.failed(x);
active.clear();
for (StandardSession.FrameBytes fb : frameBytesToFail)
fb.failed(x);
super.failed(x); super.failed(x);
} }
} }

View File

@ -124,7 +124,7 @@ public class StandardSessionTest
public Object answer(InvocationOnMock invocation) public Object answer(InvocationOnMock invocation)
{ {
Object[] args = invocation.getArguments(); Object[] args = invocation.getArguments();
Callback callback = (Callback)args[1]; Callback callback = (Callback)args[0];
if (fail) if (fail)
callback.failed(new ClosedChannelException()); callback.failed(new ClosedChannelException());
else else

View File

@ -23,12 +23,15 @@ import java.util.concurrent.atomic.AtomicReference;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** Iterating Callback. /** Iterating Callback.
* <p>This specialized callback is used when breaking up an * <p>This specialized callback implements a pattern that allows a
* asynchronous task into smaller asynchronous tasks. A typical pattern * large job to be broken into smaller tasks using iteration rather than
* is that a successful callback is used to schedule the next sub task, but * recursion.
* if that task completes quickly and uses the calling thread to callback * <p>
* the success notification, this can result in a growing stack depth. * A typical pattern used with asynchronous callbacks, is the next IO is
* </p> * done from within the scope of the success callback. The problem with this
* is that if the callback thread is the same one that calls to IO instruction,
* then recursion results and eventually a dispatch has to be done to
* avoid stack overflow (see {@link ForkInvoker}).
* <p>To avoid this issue, this callback uses an AtomicReference to note * <p>To avoid this issue, this callback uses an AtomicReference to note
* if the success callback has been called during the processing of a * if the success callback has been called during the processing of a
* sub task, and if so then the processing iterates rather than recurses. * sub task, and if so then the processing iterates rather than recurses.
@ -51,11 +54,12 @@ public abstract class IteratingCallback implements Callback
abstract protected void completed(); abstract protected void completed();
/** /**
* TODO - FIX
* Method called by iterate to process the task. * Method called by iterate to process the task.
* @return Then next state: * @return Then next state:
* <dl> * <dl>
* <dt>SUCCEEDED</dt><dd>if process returns true</dd> * <dt>SUCCEEDED</dt><dd>Return if the total task has completed</dd>
* <dt>SCHEDULED</dt><dd>This callback has been scheduled and {@link #succeeded()} or {@link #failed(Throwable)} will evenutally be called (if they have not been called already!)</dd> * <dt>SCHEDULED</dt><dd>This callback has been scheduled and {@link #succeeded()} or {@link #failed(Throwable)} will eventually be called (if they have not been called already!)</dd>
* <dt>IDLE</dt><dd>no progress can be made and another call to {@link #iterate()} is required in order to progress the task</dd> * <dt>IDLE</dt><dd>no progress can be made and another call to {@link #iterate()} is required in order to progress the task</dd>
* <dt>FAILED</dt><dd>processing has failed</dd> * <dt>FAILED</dt><dd>processing has failed</dd>
* </dl> * </dl>

View File

@ -81,6 +81,7 @@ public class JsrBasicRemote extends AbstractJsrRemote implements RemoteEndpoint.
@Override @Override
public void sendObject(Object data) throws IOException, EncodeException public void sendObject(Object data) throws IOException, EncodeException
{ {
// TODO avoid the use of a Future
Future<Void> fut = sendObjectViaFuture(data); Future<Void> fut = sendObjectViaFuture(data);
try try
{ {

View File

@ -110,10 +110,4 @@ public interface Frame
public boolean isRsv3(); public boolean isRsv3();
/**
* The current number of bytes left to read from the payload ByteBuffer.
*
* @return the current number of bytes left to read from the payload ByteBuffer
*/
public int remaining();
} }

View File

@ -241,7 +241,8 @@ public class BlockheadServer
try try
{ {
BufferUtil.writeTo(headerBuf,out); BufferUtil.writeTo(headerBuf,out);
BufferUtil.writeTo(generator.getPayloadWindow(frame.getPayloadLength(),frame),out); if (frame.hasPayload())
BufferUtil.writeTo(frame.getPayload(),out);
out.flush(); out.flush();
if (callback != null) if (callback != null)
{ {

View File

@ -328,7 +328,7 @@ public class Generator
buf.put(generateHeaderBytes(frame)); buf.put(generateHeaderBytes(frame));
if (frame.hasPayload()) if (frame.hasPayload())
{ {
buf.put(getPayloadWindow(frame.getPayloadLength(),frame)); buf.put(frame.getPayload());
} }
} }
@ -337,36 +337,6 @@ public class Generator
return bufferPool; return bufferPool;
} }
public ByteBuffer getPayloadWindow(int windowSize, Frame frame)
{
if (!frame.hasPayload())
{
return BufferUtil.EMPTY_BUFFER;
}
ByteBuffer buffer;
// We will create a slice representing the windowSize of this payload
if (frame.getPayload().remaining() <= windowSize)
{
// remaining will fit within window
buffer = frame.getPayload().slice();
// adjust the frame payload position (mark as read)
frame.getPayload().position(frame.getPayload().limit());
}
else
{
// remaining is over the window size limit, slice it
buffer = frame.getPayload().slice();
buffer.limit(windowSize);
int offset = frame.getPayload().position(); // offset within frame payload
// adjust the frame payload position
int newpos = Math.min(offset + windowSize,frame.getPayload().limit());
frame.getPayload().position(newpos);
}
return buffer;
}
public void setRsv1InUse(boolean rsv1InUse) public void setRsv1InUse(boolean rsv1InUse)
{ {

View File

@ -112,8 +112,6 @@ public abstract class WebSocketFrame implements Frame
*/ */
protected ByteBuffer data; protected ByteBuffer data;
protected int payloadLength = 0;
/** /**
* Construct form opcode * Construct form opcode
*/ */
@ -219,11 +217,6 @@ public abstract class WebSocketFrame implements Frame
/** /**
* Get the payload ByteBuffer. possible null. * Get the payload ByteBuffer. possible null.
* <p>
*
* @return A {@link ByteBuffer#slice()} of the payload buffer (to prevent modification of the buffer state). Possibly null if no payload present.
* <p>
* Note: this method is exposed via the immutable {@link Frame#getPayload()} method.
*/ */
@Override @Override
public ByteBuffer getPayload() public ByteBuffer getPayload()
@ -243,7 +236,7 @@ public abstract class WebSocketFrame implements Frame
{ {
return 0; return 0;
} }
return payloadLength; return data.remaining();
} }
@Override @Override
@ -266,7 +259,7 @@ public abstract class WebSocketFrame implements Frame
@Override @Override
public boolean hasPayload() public boolean hasPayload()
{ {
return ((data != null) && (payloadLength > 0)); return ((data != null) && data.hasRemaining());
} }
public abstract boolean isControlFrame(); public abstract boolean isControlFrame();
@ -309,45 +302,11 @@ public abstract class WebSocketFrame implements Frame
return (byte)(finRsvOp & 0x10) != 0; return (byte)(finRsvOp & 0x10) != 0;
} }
/**
* Get the position currently within the payload data.
* <p>
* Used by flow control, generator and window sizing.
*
* @return the number of bytes remaining in the payload data that has not yet been written out to Network ByteBuffers.
*/
public int position()
{
if (data == null)
{
return -1;
}
return data.position();
}
/**
* Get the number of bytes remaining to write out to the Network ByteBuffer.
* <p>
* Used by flow control, generator and window sizing.
*
* @return the number of bytes remaining in the payload data that has not yet been written out to Network ByteBuffers.
*/
@Override
public int remaining()
{
if (data == null)
{
return 0;
}
return data.remaining();
}
public void reset() public void reset()
{ {
finRsvOp = (byte)0x80; // FIN (!RSV, opcode 0) finRsvOp = (byte)0x80; // FIN (!RSV, opcode 0)
masked = false; masked = false;
data = null; data = null;
payloadLength = 0;
mask = null; mask = null;
} }
@ -396,7 +355,6 @@ public abstract class WebSocketFrame implements Frame
} }
data = buf.slice(); data = buf.slice();
payloadLength = data.limit();
return this; return this;
} }
@ -427,15 +385,13 @@ public abstract class WebSocketFrame implements Frame
StringBuilder b = new StringBuilder(); StringBuilder b = new StringBuilder();
b.append(OpCode.name((byte)(finRsvOp & 0x0F))); b.append(OpCode.name((byte)(finRsvOp & 0x0F)));
b.append('['); b.append('[');
b.append("len=").append(payloadLength); b.append("len=").append(getPayloadLength());
b.append(",fin=").append((finRsvOp & 0x80) != 0); b.append(",fin=").append((finRsvOp & 0x80) != 0);
b.append(",rsv="); b.append(",rsv=");
b.append(((finRsvOp & 0x40) != 0)?'1':'.'); b.append(((finRsvOp & 0x40) != 0)?'1':'.');
b.append(((finRsvOp & 0x20) != 0)?'1':'.'); b.append(((finRsvOp & 0x20) != 0)?'1':'.');
b.append(((finRsvOp & 0x10) != 0)?'1':'.'); b.append(((finRsvOp & 0x10) != 0)?'1':'.');
b.append(",masked=").append(masked); b.append(",masked=").append(masked);
b.append(",remaining=").append(remaining());
b.append(",position=").append(position());
b.append(']'); b.append(']');
return b.toString(); return b.toString();
} }

View File

@ -198,7 +198,7 @@ public class DeflateFrameExtension extends AbstractExtension
DataFrame out = new DataFrame(frame); DataFrame out = new DataFrame(frame);
out.setRsv1(true); out.setRsv1(true);
out.setPooledBuffer(true); out.setBufferPool(getBufferPool());
out.setPayload(outbuf); out.setPayload(outbuf);
if (!compressor.needsInput()) if (!compressor.needsInput())

View File

@ -197,7 +197,7 @@ public class PerMessageDeflateExtension extends AbstractExtension
DataFrame out = new DataFrame(frame); DataFrame out = new DataFrame(frame);
out.setRsv1(true); out.setRsv1(true);
out.setPooledBuffer(true); out.setBufferPool(getBufferPool());
out.setPayload(outbuf); out.setPayload(outbuf);
if (!compressor.needsInput()) if (!compressor.needsInput())

View File

@ -28,7 +28,7 @@ import org.eclipse.jetty.websocket.common.WebSocketFrame;
*/ */
public class DataFrame extends WebSocketFrame public class DataFrame extends WebSocketFrame
{ {
private boolean isPooledBuffer = false; private ByteBufferPool pool;
protected DataFrame(byte opcode) protected DataFrame(byte opcode)
{ {
@ -78,12 +78,13 @@ public class DataFrame extends WebSocketFrame
return true; return true;
} }
/** public void releaseBuffer()
* @return true if payload buffer is from a {@link ByteBufferPool} and can be released when appropriate to do so
*/
public boolean isPooledBuffer()
{ {
return isPooledBuffer; if (pool!=null)
{
pool.release(this.data);
this.data=null;
}
} }
/** /**
@ -95,10 +96,10 @@ public class DataFrame extends WebSocketFrame
} }
/** /**
* Sets a flag indicating that the underlying payload is from a {@link ByteBufferPool} and can be released when appropriate to do so * Sets the buffer pool used for the payload
*/ */
public void setPooledBuffer(boolean isPooledBuffer) public void setBufferPool(ByteBufferPool pool)
{ {
this.isPooledBuffer = isPooledBuffer; this.pool = pool;
} }
} }

View File

@ -34,8 +34,6 @@ import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ForkInvoker;
import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
@ -44,7 +42,6 @@ import org.eclipse.jetty.websocket.api.CloseException;
import org.eclipse.jetty.websocket.api.CloseStatus; import org.eclipse.jetty.websocket.api.CloseStatus;
import org.eclipse.jetty.websocket.api.StatusCode; import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.SuspendToken; import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig; import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
@ -62,13 +59,15 @@ import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
*/ */
public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, ConnectionStateListener public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, ConnectionStateListener
{ {
private class FlushCallback implements Callback private class Flusher extends FrameFlusher
{ {
/** private Flusher(Generator generator, EndPoint endpoint)
* The Endpoint.write() failure path {
*/ super(generator,endpoint);
}
@Override @Override
public void failed(Throwable x) protected void onFailure(Throwable x)
{ {
if (ioState.wasAbnormalClose()) if (ioState.wasAbnormalClose())
{ {
@ -106,45 +105,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
disconnect(); // disconnect endpoint & connection disconnect(); // disconnect endpoint & connection
} }
@Override
public void succeeded()
{
AbstractWebSocketConnection.this.complete(writeBytes);
}
}
private class FlushInvoker extends ForkInvoker<Callback>
{
private FlushInvoker()
{
super(4);
}
@Override
public void call(Callback callback)
{
flush();
}
@Override
public void fork(final Callback callback)
{
execute(new Runnable()
{
@Override
public void run()
{
flush();
}
});
}
@Override
public String toString()
{
return String.format("%s@%x",FlushInvoker.class.getSimpleName(),hashCode());
}
} }
public class OnDisconnectCallback implements WriteCallback public class OnDisconnectCallback implements WriteCallback
@ -191,17 +151,15 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
*/ */
private static final int MIN_BUFFER_SIZE = Generator.OVERHEAD; private static final int MIN_BUFFER_SIZE = Generator.OVERHEAD;
private final ForkInvoker<Callback> invoker = new FlushInvoker();
private final ByteBufferPool bufferPool; private final ByteBufferPool bufferPool;
private final Scheduler scheduler; private final Scheduler scheduler;
private final Generator generator; private final Generator generator;
private final Parser parser; private final Parser parser;
private final WebSocketPolicy policy; private final WebSocketPolicy policy;
private final WriteBytesProvider writeBytes;
private final AtomicBoolean suspendToken; private final AtomicBoolean suspendToken;
private final FrameFlusher flusher;
private WebSocketSession session; private WebSocketSession session;
private List<ExtensionConfig> extensions; private List<ExtensionConfig> extensions;
private boolean flushing;
private boolean isFilling; private boolean isFilling;
private IOState ioState; private IOState ioState;
private Stats stats = new Stats(); private Stats stats = new Stats();
@ -218,7 +176,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
this.suspendToken = new AtomicBoolean(false); this.suspendToken = new AtomicBoolean(false);
this.ioState = new IOState(); this.ioState = new IOState();
this.ioState.addListener(this); this.ioState.addListener(this);
this.writeBytes = new WriteBytesProvider(generator,new FlushCallback()); this.flusher = new Flusher(generator,endp);
this.setInputBufferSize(policy.getInputBufferSize()); this.setInputBufferSize(policy.getInputBufferSize());
} }
@ -251,6 +209,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
CloseInfo close = new CloseInfo(statusCode,reason); CloseInfo close = new CloseInfo(statusCode,reason);
if (statusCode == StatusCode.ABNORMAL) if (statusCode == StatusCode.ABNORMAL)
{ {
flusher.close(); // TODO this makes the IdleTimeoutTest pass, but I'm dubious it is the correct way
ioState.onAbnormalClose(close); ioState.onAbnormalClose(close);
} }
else else
@ -259,33 +218,12 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
} }
} }
public void complete(final Callback callback)
{
LOG.debug("complete({})",callback);
synchronized (writeBytes)
{
flushing = false;
}
if (!ioState.isOpen() || (callback == null))
{
return;
}
invoker.invoke(callback);
}
@Override @Override
public void disconnect() public void disconnect()
{ {
LOG.debug("{} disconnect()",policy.getBehavior()); LOG.debug("{} disconnect()",policy.getBehavior());
synchronized (writeBytes) flusher.close();
{
if (!writeBytes.isClosed())
{
writeBytes.close();
}
}
disconnect(false); disconnect(false);
} }
@ -324,39 +262,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
public void flush() public void flush()
{ {
List<ByteBuffer> buffers = null; flusher.flush();
synchronized (writeBytes)
{
if (flushing)
{
LOG.debug("Actively flushing");
return;
}
if (LOG.isDebugEnabled())
{
LOG.debug(".flush() - flushing={} - writeBytes={}",flushing,writeBytes);
}
if (!isOpen())
{
// No longer have an open connection, drop them all.
writeBytes.failAll(new WebSocketException("Connection closed"));
return;
}
buffers = writeBytes.getByteBuffers();
if ((buffers == null) || (buffers.size() <= 0))
{
return;
}
flushing = true;
}
write(buffers);
} }
@Override @Override
@ -454,7 +360,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
public void onClose() public void onClose()
{ {
super.onClose(); super.onClose();
writeBytes.close(); flusher.close();
} }
@Override @Override
@ -495,7 +401,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
LOG.debug("{} onFillable()",policy.getBehavior()); LOG.debug("{} onFillable()",policy.getBehavior());
stats.countOnFillableEvents.incrementAndGet(); stats.countOnFillableEvents.incrementAndGet();
ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),true); ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),true);
BufferUtil.clear(buffer);
boolean readMore = false; boolean readMore = false;
try try
{ {
@ -564,9 +469,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
LOG.debug("outgoingFrame({}, {})",frame,callback); LOG.debug("outgoingFrame({}, {})",frame,callback);
} }
writeBytes.enqueue(frame,callback); flusher.enqueue(frame,callback);
flush();
} }
private int read(ByteBuffer buffer) private int read(ByteBuffer buffer)
@ -669,28 +572,4 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return String.format("%s{g=%s,p=%s}",super.toString(),generator,parser); return String.format("%s{g=%s,p=%s}",super.toString(),generator,parser);
} }
private <C> void write(List<ByteBuffer> buffer)
{
EndPoint endpoint = getEndPoint();
try
{
int bufsize = buffer.size();
if (bufsize == 1)
{
// simple case
endpoint.write(writeBytes,buffer.get(0));
}
else
{
// gathered writes case
ByteBuffer bbarr[] = buffer.toArray(new ByteBuffer[bufsize]);
endpoint.write(writeBytes,bbarr);
}
}
catch (Throwable t)
{
writeBytes.failed(t);
}
}
} }

View File

@ -0,0 +1,381 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.io;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.frames.DataFrame;
/**
* Interface for working with bytes destined for {@link EndPoint#write(Callback, ByteBuffer...)}
*/
public class FrameFlusher
{
private static final Logger LOG = Log.getLogger(FrameFlusher.class);
/** The endpoint to flush to */
private final EndPoint endpoint;
/** The websocket generator */
private final Generator generator;
private final Object lock = new Object();
/** Backlog of frames */
private final ArrayQueue<FrameEntry> queue = new ArrayQueue<>(lock);
private final FlusherCB flusherCB = new FlusherCB();
/** the buffer input size */
private int bufferSize = 2048;
/** the gathered write bytebuffer array limit */
private int gatheredBufferLimit = 10;
/** Tracking for failure */
private Throwable failure;
/** Is WriteBytesProvider closed to more WriteBytes being enqueued? */
private boolean closed;
/**
* Create a WriteBytesProvider with specified Generator and "flush" Callback.
*
* @param generator
* the generator to use for converting {@link Frame} objects to network {@link ByteBuffer}s
* @param endpoint
* the endpoint to flush to.
*/
public FrameFlusher(Generator generator, EndPoint endpoint)
{
this.endpoint=endpoint;
this.generator = Objects.requireNonNull(generator);
}
/**
* Set the buffer size used for generating ByteBuffers from the frames.
* <p>
* Value usually obtained from {@link AbstractConnection#getInputBufferSize()}
*
* @param bufferSize
* the buffer size to use
*/
public void setBufferSize(int bufferSize)
{
this.bufferSize = bufferSize;
}
public int getBufferSize()
{
return bufferSize;
}
/**
* Force closure of write bytes
*/
public void close()
{
synchronized (lock)
{
if (!closed)
{
closed=true;
EOFException eof = new EOFException("Connection has been disconnected");
flusherCB.failed(eof);
for (FrameEntry frame : queue)
frame.notifyFailed(eof);
queue.clear();
}
}
}
/**
* Used to test for the final frame possible to be enqueued, the CLOSE frame.
*
* @return true if close frame has been enqueued already.
*/
public boolean isClosed()
{
synchronized (lock)
{
return closed;
}
}
public void enqueue(Frame frame, WriteCallback callback)
{
Objects.requireNonNull(frame);
FrameEntry entry = new FrameEntry(frame,callback);
LOG.debug("enqueue({})",entry);
Throwable failure=null;
synchronized (lock)
{
if (closed)
{
// Closed for more frames.
LOG.debug("Write is closed: {} {}",frame,callback);
failure=new IOException("Write is closed");
}
else if (this.failure!=null)
{
failure=this.failure;
}
switch (frame.getOpCode())
{
case OpCode.PING:
queue.add(0,entry);
break;
case OpCode.CLOSE:
closed=true;
queue.add(entry);
break;
default:
queue.add(entry);
}
}
if (failure != null)
{
// no changes when failed
LOG.debug("Write is in failure: {} {}",frame,callback);
entry.notifyFailed(failure);
return;
}
flush();
}
void flush()
{
flusherCB.iterate();
}
protected void onFailure(Throwable x)
{
LOG.warn(x);
}
@Override
public String toString()
{
StringBuilder b = new StringBuilder();
b.append("WriteBytesProvider[");
if (failure != null)
{
b.append("failure=").append(failure.getClass().getName());
b.append(":").append(failure.getMessage()).append(',');
}
else
{
b.append("queue.size=").append(queue.size());
}
b.append(']');
return b.toString();
}
private class FlusherCB extends IteratingCallback
{
private final ArrayQueue<FrameEntry> active = new ArrayQueue<>(lock);
private final List<ByteBuffer> buffers = new ArrayList<>(gatheredBufferLimit*2);
private final List<FrameEntry> succeeded = new ArrayList<>(gatheredBufferLimit+1);
@Override
protected void completed()
{
// will never be called as process always returns SCHEDULED or IDLE
throw new IllegalStateException();
}
@Override
protected State process() throws Exception
{
synchronized (lock)
{
succeeded.clear();
// If we exited the loop above without hitting the gatheredBufferLimit
// then all the active frames are done, so we can add some more.
while (buffers.size()<gatheredBufferLimit && !queue.isEmpty())
{
FrameEntry frame = queue.remove(0);
active.add(frame);
buffers.add(frame.getHeaderBytes());
ByteBuffer payload = frame.getPayload();
if (payload!=null)
buffers.add(payload);
}
if (LOG.isDebugEnabled())
LOG.debug("process {} active={} buffers={}",FrameFlusher.this,active,buffers);
}
if (buffers.size()==0)
return State.IDLE;
endpoint.write(this,buffers.toArray(new ByteBuffer[buffers.size()]));
buffers.clear();
return State.SCHEDULED;
}
@Override
public void succeeded()
{
synchronized (lock)
{
succeeded.addAll(active);
active.clear();
}
for (FrameEntry frame:succeeded)
{
frame.notifySucceeded();
frame.freeBuffers();
}
super.succeeded();
}
@Override
public void failed(Throwable x)
{
synchronized (lock)
{
succeeded.addAll(active);
active.clear();
}
for (FrameEntry frame : succeeded)
{
frame.notifyFailed(x);
frame.freeBuffers();
}
succeeded.clear();
super.failed(x);
onFailure(x);
}
}
private class FrameEntry
{
protected final AtomicBoolean failed = new AtomicBoolean(false);
protected final Frame frame;
protected final WriteCallback callback;
/** holds reference to header ByteBuffer, as it needs to be released on success/failure */
private ByteBuffer headerBuffer;
public FrameEntry(Frame frame, WriteCallback callback)
{
this.frame = frame;
this.callback = callback;
}
public ByteBuffer getHeaderBytes()
{
ByteBuffer buf = generator.generateHeaderBytes(frame);
headerBuffer = buf;
return buf;
}
public ByteBuffer getPayload()
{
// There is no need to release this ByteBuffer, as it is just a slice of the user provided payload
return frame.getPayload();
}
public void notifyFailed(Throwable t)
{
freeBuffers();
if (failed.getAndSet(true) == false)
{
try
{
if (callback!=null)
callback.writeFailed(t);
}
catch (Throwable e)
{
LOG.warn("Uncaught exception",e);
}
}
}
public void notifySucceeded()
{
freeBuffers();
if (callback == null)
{
return;
}
try
{
callback.writeSuccess();
}
catch (Throwable t)
{
LOG.debug(t);
}
}
public void freeBuffers()
{
if (headerBuffer != null)
{
generator.getBufferPool().release(headerBuffer);
headerBuffer = null;
}
if (!frame.hasPayload())
{
return;
}
if (frame instanceof DataFrame)
{
// TODO null payload within frame
DataFrame data = (DataFrame)frame;
data.releaseBuffer();
}
}
public String toString()
{
return "["+callback+","+frame+","+failure+"]";
}
}
}

View File

@ -1,432 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.io;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.frames.DataFrame;
/**
* Interface for working with bytes destined for {@link EndPoint#write(Callback, ByteBuffer...)}
*/
public class WriteBytesProvider implements Callback
{
private class FrameEntry
{
protected final AtomicBoolean failed = new AtomicBoolean(false);
protected final Frame frame;
protected final WriteCallback callback;
/** holds reference to header ByteBuffer, as it needs to be released on success/failure */
private ByteBuffer headerBuffer;
public FrameEntry(Frame frame, WriteCallback callback)
{
this.frame = frame;
this.callback = callback;
}
public ByteBuffer getHeaderBytes()
{
ByteBuffer buf = generator.generateHeaderBytes(frame);
headerBuffer = buf;
return buf;
}
public ByteBuffer getPayloadWindow()
{
// There is no need to release this ByteBuffer, as it is just a slice of the user provided payload
return generator.getPayloadWindow(bufferSize,frame);
}
public void notifyFailed(Throwable t)
{
freeBuffers();
if (failed.getAndSet(true) == false)
{
notifySafeFailure(callback,t);
}
}
public void notifySucceeded()
{
freeBuffers();
if (callback == null)
{
return;
}
try
{
callback.writeSuccess();
}
catch (Throwable t)
{
LOG.debug(t);
}
}
public void freeBuffers()
{
if (headerBuffer != null)
{
generator.getBufferPool().release(headerBuffer);
headerBuffer = null;
}
releasePayloadBuffer(frame);
}
/**
* Indicate that the frame entry is done generating
*/
public boolean isDone()
{
return frame.remaining() <= 0;
}
}
private static final Logger LOG = Log.getLogger(WriteBytesProvider.class);
/** The websocket generator */
private final Generator generator;
/** Flush callback, for notifying when a flush should be performed */
private final Callback flushCallback;
/** Backlog of frames */
private final LinkedList<FrameEntry> queue;
/** the buffer input size */
private int bufferSize = 2048;
/** the gathered write bytebuffer array limit */
private int gatheredBufferLimit = 10;
/** Past Frames, not yet notified (from gathered generation/write) */
private final LinkedList<FrameEntry> past;
/** Currently active frame */
private FrameEntry active;
/** Tracking for failure */
private Throwable failure;
/** Is WriteBytesProvider closed to more WriteBytes being enqueued? */
private final AtomicBoolean closed;
/**
* Create a WriteBytesProvider with specified Generator and "flush" Callback.
*
* @param generator
* the generator to use for converting {@link Frame} objects to network {@link ByteBuffer}s
* @param flushCallback
* the flush callback to call, on a write event, after the write event has been processed by this {@link WriteBytesProvider}.
* <p>
* Used to trigger another flush of the next set of bytes.
*/
public WriteBytesProvider(Generator generator, Callback flushCallback)
{
this.generator = Objects.requireNonNull(generator);
this.flushCallback = Objects.requireNonNull(flushCallback);
this.queue = new LinkedList<>();
this.past = new LinkedList<>();
this.closed = new AtomicBoolean(false);
}
/**
* Force closure of write bytes
*/
public void close()
{
LOG.debug(".close()");
// Set queue closed, no new enqueue allowed.
this.closed.set(true);
// flush out backlog in queue
failAll(new EOFException("Connection has been disconnected"));
}
public void enqueue(Frame frame, WriteCallback callback)
{
Objects.requireNonNull(frame);
LOG.debug("enqueue({}, {})",frame,callback);
synchronized (this)
{
if (closed.get())
{
// Closed for more frames.
LOG.debug("Write is closed: {} {}",frame,callback);
if (callback != null)
{
callback.writeFailed(new IOException("Write is closed"));
}
return;
}
if (failure != null)
{
// no changes when failed
LOG.debug("Write is in failure: {} {}",frame,callback);
notifySafeFailure(callback,failure);
return;
}
FrameEntry entry = new FrameEntry(frame,callback);
switch (frame.getOpCode())
{
case OpCode.PING:
queue.addFirst(entry);
break;
case OpCode.CLOSE:
closed.set(true);
// drop the rest of the queue?
queue.addLast(entry);
break;
default:
queue.addLast(entry);
}
}
}
public void failAll(Throwable t)
{
// Collect entries for callback
List<FrameEntry> callbacks = new ArrayList<>();
synchronized (this)
{
// fail active (if set)
if (active != null)
{
FrameEntry entry = active;
active = null;
callbacks.add(entry);
}
callbacks.addAll(past);
callbacks.addAll(queue);
past.clear();
queue.clear();
}
// notify flush callback
if (!callbacks.isEmpty())
{
// TODO: always notify instead?
flushCallback.failed(t);
// notify entry callbacks
for (FrameEntry entry : callbacks)
{
entry.notifyFailed(t);
}
}
}
/**
* Callback failure.
* <p>
* Conditions: for Endpoint.write() failure.
*
* @param cause
* the cause of the failure
*/
@Override
public void failed(Throwable cause)
{
failAll(cause);
}
public int getBufferSize()
{
return bufferSize;
}
/**
* Get the next set of ByteBuffers to write.
*
* @return the next set of ByteBuffers to write
*/
public List<ByteBuffer> getByteBuffers()
{
List<ByteBuffer> bufs = null;
int count = 0;
synchronized (this)
{
for (; count < gatheredBufferLimit; count++)
{
if (active == null)
{
if (queue.isEmpty())
{
// nothing in queue
return bufs;
}
// get current topmost entry
active = queue.pop();
// generate header
if (bufs == null)
{
bufs = new ArrayList<>();
}
bufs.add(active.getHeaderBytes());
count++;
}
// collect payload window
if (bufs == null)
{
bufs = new ArrayList<>();
}
bufs.add(active.getPayloadWindow());
if (active.isDone())
{
past.add(active);
active = null;
}
}
}
LOG.debug("Collected {} ByteBuffers",bufs.size());
return bufs;
}
/**
* Used to test for the final frame possible to be enqueued, the CLOSE frame.
*
* @return true if close frame has been enqueued already.
*/
public boolean isClosed()
{
synchronized (this)
{
return closed.get();
}
}
private void notifySafeFailure(WriteCallback callback, Throwable t)
{
if (callback == null)
{
return;
}
try
{
callback.writeFailed(t);
}
catch (Throwable e)
{
LOG.warn("Uncaught exception",e);
}
}
public void releasePayloadBuffer(Frame frame)
{
if (!frame.hasPayload())
{
return;
}
if (frame instanceof DataFrame)
{
DataFrame data = (DataFrame)frame;
if (data.isPooledBuffer())
{
ByteBuffer payload = frame.getPayload();
generator.getBufferPool().release(payload);
}
}
}
/**
* Set the buffer size used for generating ByteBuffers from the frames.
* <p>
* Value usually obtained from {@link AbstractConnection#getInputBufferSize()}
*
* @param bufferSize
* the buffer size to use
*/
public void setBufferSize(int bufferSize)
{
this.bufferSize = bufferSize;
}
/**
* Write of ByteBuffer succeeded.
*/
@Override
public void succeeded()
{
// Collect entries for callback
List<FrameEntry> callbacks = new ArrayList<>();
synchronized (this)
{
if ((active != null) && (active.frame.remaining() <= 0))
{
// All done with active FrameEntry
FrameEntry entry = active;
active = null;
callbacks.add(entry);
}
callbacks.addAll(past);
past.clear();
}
// notify flush callback
flushCallback.succeeded();
// notify entry callbacks outside of synchronize
for (FrameEntry entry : callbacks)
{
entry.notifySucceeded();
}
}
@Override
public String toString()
{
StringBuilder b = new StringBuilder();
b.append("WriteBytesProvider[");
b.append("flushCallback=").append(flushCallback);
if (failure != null)
{
b.append(",failure=").append(failure.getClass().getName());
b.append(":").append(failure.getMessage());
}
else
{
b.append(",active=").append(active);
b.append(",queue.size=").append(queue.size());
b.append(",past.size=").append(past.size());
}
b.append(']');
return b.toString();
}
}

View File

@ -29,6 +29,7 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.BlockingWriteCallback;
import org.eclipse.jetty.websocket.common.WebSocketSession; import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.frames.BinaryFrame; import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
import org.eclipse.jetty.websocket.common.io.FutureWriteCallback; import org.eclipse.jetty.websocket.common.io.FutureWriteCallback;
@ -41,10 +42,10 @@ public class MessageOutputStream extends OutputStream
private static final Logger LOG = Log.getLogger(MessageOutputStream.class); private static final Logger LOG = Log.getLogger(MessageOutputStream.class);
private final OutgoingFrames outgoing; private final OutgoingFrames outgoing;
private final ByteBufferPool bufferPool; private final ByteBufferPool bufferPool;
private final BlockingWriteCallback blocker;
private long frameCount = 0; private long frameCount = 0;
private BinaryFrame frame; private BinaryFrame frame;
private ByteBuffer buffer; private ByteBuffer buffer;
private FutureWriteCallback blocker;
private WriteCallback callback; private WriteCallback callback;
private boolean closed = false; private boolean closed = false;
@ -52,6 +53,7 @@ public class MessageOutputStream extends OutputStream
{ {
this.outgoing = outgoing; this.outgoing = outgoing;
this.bufferPool = bufferPool; this.bufferPool = bufferPool;
this.blocker = new BlockingWriteCallback();
this.buffer = bufferPool.acquire(bufferSize,true); this.buffer = bufferPool.acquire(bufferSize,true);
BufferUtil.flipToFill(buffer); BufferUtil.flipToFill(buffer);
this.frame = new BinaryFrame(); this.frame = new BinaryFrame();
@ -137,36 +139,12 @@ public class MessageOutputStream extends OutputStream
try try
{ {
blocker = new FutureWriteCallback();
outgoing.outgoingFrame(frame,blocker); outgoing.outgoingFrame(frame,blocker);
try // block on write
{ blocker.block();
// block on write // block success
blocker.get(); frameCount++;
// block success frame.setIsContinuation();
frameCount++;
frame.setIsContinuation();
}
catch (ExecutionException e)
{
Throwable cause = e.getCause();
if (cause != null)
{
if (cause instanceof IOException)
{
throw (IOException)cause;
}
else
{
throw new IOException(cause);
}
}
throw new IOException("Failed to flush",e);
}
catch (InterruptedException e)
{
throw new IOException("Failed to flush",e);
}
} }
catch (IOException e) catch (IOException e)
{ {

View File

@ -21,7 +21,6 @@ package org.eclipse.jetty.websocket.common.message;
import java.io.IOException; import java.io.IOException;
import java.io.Writer; import java.io.Writer;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
@ -29,9 +28,9 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.BlockingWriteCallback;
import org.eclipse.jetty.websocket.common.WebSocketSession; import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.frames.TextFrame; import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.io.FutureWriteCallback;
/** /**
* Support for writing a single WebSocket TEXT message via a {@link Writer} * Support for writing a single WebSocket TEXT message via a {@link Writer}
@ -43,11 +42,11 @@ public class MessageWriter extends Writer
private static final Logger LOG = Log.getLogger(MessageWriter.class); private static final Logger LOG = Log.getLogger(MessageWriter.class);
private final OutgoingFrames outgoing; private final OutgoingFrames outgoing;
private final ByteBufferPool bufferPool; private final ByteBufferPool bufferPool;
private final BlockingWriteCallback blocker;
private long frameCount = 0; private long frameCount = 0;
private TextFrame frame; private TextFrame frame;
private ByteBuffer buffer; private ByteBuffer buffer;
private Utf8CharBuffer utf; private Utf8CharBuffer utf;
private FutureWriteCallback blocker;
private WriteCallback callback; private WriteCallback callback;
private boolean closed = false; private boolean closed = false;
@ -55,6 +54,7 @@ public class MessageWriter extends Writer
{ {
this.outgoing = outgoing; this.outgoing = outgoing;
this.bufferPool = bufferPool; this.bufferPool = bufferPool;
this.blocker = new BlockingWriteCallback();
this.buffer = bufferPool.acquire(bufferSize,true); this.buffer = bufferPool.acquire(bufferSize,true);
BufferUtil.flipToFill(buffer); BufferUtil.flipToFill(buffer);
this.utf = Utf8CharBuffer.wrap(buffer); this.utf = Utf8CharBuffer.wrap(buffer);
@ -118,38 +118,14 @@ public class MessageWriter extends Writer
try try
{ {
blocker = new FutureWriteCallback();
outgoing.outgoingFrame(frame,blocker); outgoing.outgoingFrame(frame,blocker);
try // block on write
{ blocker.block();
// block on write // write success
blocker.get(); // clear utf buffer
// write success utf.clear();
// clear utf buffer frameCount++;
utf.clear(); frame.setIsContinuation();
frameCount++;
frame.setIsContinuation();
}
catch (ExecutionException e)
{
Throwable cause = e.getCause();
if (cause != null)
{
if (cause instanceof IOException)
{
throw (IOException)cause;
}
else
{
throw new IOException(cause);
}
}
throw new IOException("Failed to flush",e);
}
catch (InterruptedException e)
{
throw new IOException("Failed to flush",e);
}
} }
catch (IOException e) catch (IOException e)
{ {

View File

@ -52,7 +52,7 @@ public class GeneratorParserRoundtripTest
BufferUtil.flipToFill(out); BufferUtil.flipToFill(out);
WebSocketFrame frame = new TextFrame().setPayload(message); WebSocketFrame frame = new TextFrame().setPayload(message);
ByteBuffer header = gen.generateHeaderBytes(frame); ByteBuffer header = gen.generateHeaderBytes(frame);
ByteBuffer payload = gen.getPayloadWindow(frame.getPayloadLength(),frame); ByteBuffer payload = frame.getPayload();
out.put(header); out.put(header);
out.put(payload); out.put(payload);
@ -99,7 +99,7 @@ public class GeneratorParserRoundtripTest
// Generate Buffer // Generate Buffer
ByteBuffer header = gen.generateHeaderBytes(frame); ByteBuffer header = gen.generateHeaderBytes(frame);
ByteBuffer payload = gen.getPayloadWindow(8192,frame); ByteBuffer payload = frame.getPayload();
out.put(header); out.put(header);
out.put(payload); out.put(payload);

View File

@ -75,18 +75,12 @@ public class GeneratorTest
ByteBuffer header = generator.generateHeaderBytes(f); ByteBuffer header = generator.generateHeaderBytes(f);
totalBytes += BufferUtil.put(header,completeBuf); totalBytes += BufferUtil.put(header,completeBuf);
// Generate using windowing if (f.hasPayload())
boolean done = false;
while (!done)
{ {
ByteBuffer window = generator.getPayloadWindow(windowSize,f); ByteBuffer payload=f.getPayload();
Assert.assertThat("Generated should not exceed window size",window.remaining(),lessThanOrEqualTo(windowSize)); totalBytes += payload.remaining();
totalBytes += window.remaining();
completeBuf.put(window);
totalParts++; totalParts++;
completeBuf.put(payload.slice());
done = (f.remaining() <= 0);
} }
} }
@ -262,7 +256,7 @@ public class GeneratorTest
// Validate // Validate
int expectedHeaderSize = 4; int expectedHeaderSize = 4;
int expectedSize = payload.length + expectedHeaderSize; int expectedSize = payload.length + expectedHeaderSize;
int expectedParts = (int)Math.ceil((double)(payload.length) / windowSize); int expectedParts = 1;
helper.assertTotalParts(expectedParts); helper.assertTotalParts(expectedParts);
helper.assertTotalBytes(payload.length + expectedHeaderSize); helper.assertTotalBytes(payload.length + expectedHeaderSize);
@ -291,7 +285,7 @@ public class GeneratorTest
// Validate // Validate
int expectedHeaderSize = 8; int expectedHeaderSize = 8;
int expectedSize = payload.length + expectedHeaderSize; int expectedSize = payload.length + expectedHeaderSize;
int expectedParts = (int)Math.ceil((double)(payload.length) / windowSize); int expectedParts = 1;
helper.assertTotalParts(expectedParts); helper.assertTotalParts(expectedParts);
helper.assertTotalBytes(payload.length + expectedHeaderSize); helper.assertTotalBytes(payload.length + expectedHeaderSize);

View File

@ -103,7 +103,7 @@ public class UnitGenerator extends Generator
{ {
f.setMask(MASK); // make sure we have the test mask set f.setMask(MASK); // make sure we have the test mask set
BufferUtil.put(generator.generateHeaderBytes(f),completeBuf); BufferUtil.put(generator.generateHeaderBytes(f),completeBuf);
ByteBuffer window = generator.getPayloadWindow(f.getPayloadLength(),f); ByteBuffer window = f.getPayload();
if (BufferUtil.hasContent(window)) if (BufferUtil.hasContent(window))
{ {
BufferUtil.put(window,completeBuf); BufferUtil.put(window,completeBuf);

View File

@ -1,191 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.io;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.Hex;
import org.eclipse.jetty.websocket.common.UnitGenerator;
import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.junit.Assert;
import org.junit.Test;
public class WriteBytesProviderTest
{
private static final Logger LOG = Log.getLogger(WriteBytesProviderTest.class);
private WriteBytesProvider bytesProvider;
private void assertCallbackSuccessCount(TrackingCallback callback, int expectedSuccsesCount)
{
Assert.assertThat("Callback was called",callback.isCalled(),is(true));
Assert.assertThat("No Failed Callbacks",callback.getFailure(),nullValue());
Assert.assertThat("# of Success Callbacks",callback.getCallCount(),is(expectedSuccsesCount));
}
@Test
public void testSingleFrame()
{
UnitGenerator generator = new UnitGenerator();
TrackingCallback flushCallback = new TrackingCallback();
bytesProvider = new WriteBytesProvider(generator,flushCallback);
TrackingCallback frameCallback = new TrackingCallback();
Frame frame = new TextFrame().setPayload("Test");
// place in to bytes provider
bytesProvider.enqueue(frame,frameCallback);
// get bytes out
List<ByteBuffer> bytes = bytesProvider.getByteBuffers();
Assert.assertThat("Number of buffers",bytes.size(),is(2));
// Test byte values
assertExpectedBytes(bytes,"810454657374");
// Trigger success
bytesProvider.succeeded();
// Validate success
assertCallbackSuccessCount(flushCallback,1);
assertCallbackSuccessCount(frameCallback,1);
}
@Test
public void testTextClose()
{
UnitGenerator generator = new UnitGenerator();
TrackingCallback flushCallback = new TrackingCallback();
bytesProvider = new WriteBytesProvider(generator,flushCallback);
// Create frames for provider
TrackingCallback textCallback = new TrackingCallback();
TrackingCallback closeCallback = new TrackingCallback();
bytesProvider.enqueue(new TextFrame().setPayload("Bye"),textCallback);
bytesProvider.enqueue(new CloseInfo().asFrame(),closeCallback);
// get bytes out
List<ByteBuffer> bytes = bytesProvider.getByteBuffers();
Assert.assertThat("Number of buffers",bytes.size(),is(4));
// Test byte values
StringBuilder expected = new StringBuilder();
expected.append("8103427965"); // text frame
expected.append("8800"); // (empty) close frame
assertExpectedBytes(bytes,expected.toString());
// Trigger success
bytesProvider.succeeded();
// Validate success
assertCallbackSuccessCount(flushCallback,1);
assertCallbackSuccessCount(textCallback,1);
assertCallbackSuccessCount(closeCallback,1);
}
@Test
public void testTinyBufferSizeFrame()
{
UnitGenerator generator = new UnitGenerator();
TrackingCallback flushCallback = new TrackingCallback();
bytesProvider = new WriteBytesProvider(generator,flushCallback);
int bufferSize = 30;
bytesProvider.setBufferSize(bufferSize);
// Create frames for provider
TrackingCallback binCallback = new TrackingCallback();
TrackingCallback closeCallback = new TrackingCallback();
int binPayloadSize = 50;
byte bin[] = new byte[binPayloadSize];
Arrays.fill(bin,(byte)0x00);
BinaryFrame binFrame = new BinaryFrame().setPayload(bin);
byte maskingKey[] = Hex.asByteArray("11223344");
binFrame.setMask(maskingKey);
bytesProvider.enqueue(binFrame,binCallback);
bytesProvider.enqueue(new CloseInfo().asFrame(),closeCallback);
// get bytes out
List<ByteBuffer> bytes = bytesProvider.getByteBuffers();
Assert.assertThat("Number of buffers",bytes.size(),is(5));
assertBufferLengths(bytes,6,bufferSize,binPayloadSize-bufferSize,2,0);
// Test byte values
StringBuilder expected = new StringBuilder();
expected.append("82B2").append("11223344"); // bin frame
// build up masked bytes
byte masked[] = new byte[binPayloadSize];
Arrays.fill(masked,(byte)0x00);
for (int i = 0; i < binPayloadSize; i++)
{
masked[i] ^= maskingKey[i % 4];
}
expected.append(Hex.asHex(masked));
expected.append("8800"); // (empty) close frame
assertExpectedBytes(bytes,expected.toString());
// Trigger success
bytesProvider.succeeded();
// Validate success
assertCallbackSuccessCount(flushCallback,1);
assertCallbackSuccessCount(binCallback,1);
assertCallbackSuccessCount(closeCallback,1);
}
private void assertBufferLengths(List<ByteBuffer> bytes, int... expectedLengths)
{
for (int i = 0; i < expectedLengths.length; i++)
{
Assert.assertThat("Buffer[" + i + "].remaining",bytes.get(i).remaining(),is(expectedLengths[i]));
}
}
private void assertExpectedBytes(List<ByteBuffer> bytes, String expected)
{
String actual = gatheredHex(bytes);
Assert.assertThat("Expected Bytes",actual,is(expected));
}
private String gatheredHex(List<ByteBuffer> bytes)
{
int len = 0;
for (ByteBuffer buf : bytes)
{
LOG.debug("buffer[] {}", BufferUtil.toDetailString(buf));
len += buf.remaining();
}
len = len * 2;
StringBuilder ret = new StringBuilder(len);
for (ByteBuffer buf : bytes)
{
ret.append(Hex.asHex(buf));
}
return ret.toString();
}
}

View File

@ -20,12 +20,9 @@ package org.eclipse.jetty.websocket.mux;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.StatusCode; import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.SuspendToken; import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.WebSocketPolicy;
@ -35,9 +32,7 @@ import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.common.CloseInfo; import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.ConnectionState; import org.eclipse.jetty.websocket.common.ConnectionState;
import org.eclipse.jetty.websocket.common.LogicalConnection; import org.eclipse.jetty.websocket.common.LogicalConnection;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.WebSocketSession; import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.io.FutureWriteCallback;
import org.eclipse.jetty.websocket.common.io.IOState; import org.eclipse.jetty.websocket.common.io.IOState;
import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener; import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
@ -46,18 +41,13 @@ import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
*/ */
public class MuxChannel implements LogicalConnection, IncomingFrames, SuspendToken, ConnectionStateListener public class MuxChannel implements LogicalConnection, IncomingFrames, SuspendToken, ConnectionStateListener
{ {
private static final Logger LOG = Log.getLogger(MuxChannel.class);
private final long channelId; private final long channelId;
private final Muxer muxer; private final Muxer muxer;
private final AtomicBoolean inputClosed;
private final AtomicBoolean outputClosed;
private final AtomicBoolean suspendToken; private final AtomicBoolean suspendToken;
private IOState ioState; private IOState ioState;
private WebSocketPolicy policy; private WebSocketPolicy policy;
private WebSocketSession session; private WebSocketSession session;
private IncomingFrames incoming; private IncomingFrames incoming;
private String subProtocol;
public MuxChannel(long channelId, Muxer muxer) public MuxChannel(long channelId, Muxer muxer)
{ {
@ -68,9 +58,6 @@ public class MuxChannel implements LogicalConnection, IncomingFrames, SuspendTok
this.suspendToken = new AtomicBoolean(false); this.suspendToken = new AtomicBoolean(false);
this.ioState = new IOState(); this.ioState = new IOState();
this.ioState.addListener(this); this.ioState.addListener(this);
this.inputClosed = new AtomicBoolean(false);
this.outputClosed = new AtomicBoolean(false);
} }
@Override @Override
@ -209,19 +196,6 @@ public class MuxChannel implements LogicalConnection, IncomingFrames, SuspendTok
this.ioState.onOpened(); this.ioState.onOpened();
} }
/**
* Internal
*
* @param frame the frame to write
* @return the future for the network write of the frame
*/
private Future<Void> outgoingAsyncFrame(WebSocketFrame frame)
{
FutureWriteCallback future = new FutureWriteCallback();
outgoingFrame(frame,future);
return future;
}
/** /**
* Frames destined for the Muxer * Frames destined for the Muxer
*/ */
@ -262,7 +236,6 @@ public class MuxChannel implements LogicalConnection, IncomingFrames, SuspendTok
public void setSubProtocol(String subProtocol) public void setSubProtocol(String subProtocol)
{ {
this.subProtocol = subProtocol;
} }
@Override @Override

View File

@ -2,7 +2,7 @@ org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.LEVEL=WARN org.eclipse.jetty.LEVEL=WARN
# org.eclipse.jetty.io.WriteFlusher.LEVEL=DEBUG # org.eclipse.jetty.io.WriteFlusher.LEVEL=DEBUG
org.eclipse.jetty.websocket.LEVEL=DEBUG org.eclipse.jetty.websocket.LEVEL=WARN
# org.eclipse.jetty.websocket.LEVEL=WARN # org.eclipse.jetty.websocket.LEVEL=WARN
# org.eclipse.jetty.websocket.common.io.LEVEL=DEBUG # org.eclipse.jetty.websocket.common.io.LEVEL=DEBUG
# org.eclipse.jetty.websocket.server.ab.LEVEL=DEBUG # org.eclipse.jetty.websocket.server.ab.LEVEL=DEBUG
@ -14,7 +14,7 @@ org.eclipse.jetty.websocket.LEVEL=DEBUG
### Show state changes on BrowserDebugTool ### Show state changes on BrowserDebugTool
# -- LEAVE THIS AT DEBUG LEVEL -- # -- LEAVE THIS AT DEBUG LEVEL --
org.eclipse.jetty.websocket.server.browser.LEVEL=DEBUG org.eclipse.jetty.websocket.server.browser.LEVEL=WARN
### Disabling intentional error out of RFCSocket ### Disabling intentional error out of RFCSocket
org.eclipse.jetty.websocket.server.helper.RFCSocket.LEVEL=OFF org.eclipse.jetty.websocket.server.helper.RFCSocket.LEVEL=OFF