421697 - IteratingCallback improvements

avoid wrapping writeCallback
Idle state added to IteratingCallback for SPDY
This commit is contained in:
Greg Wilkins 2013-11-21 14:52:39 +11:00
parent 41f60bd152
commit 1eb2997efd
14 changed files with 615 additions and 243 deletions

View File

@ -99,9 +99,10 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
if (updateSenderState(current, SenderState.SENDING))
{
LOG.debug("Deferred content available, idle -> sending");
// TODO should just call contentCallback.iterate() here.
HttpContent content = this.content;
content.advance();
sendContent(exchange, content, contentCallback);
sendContent(exchange, content, contentCallback); // TODO old style usage!
return;
}
break;
@ -416,11 +417,13 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
if (!updateSenderState(current, SenderState.SENDING))
break;
HttpContent content = this.content;
// TODO should just call contentCallback.iterate() here.
if (content.advance())
{
// There is content to send
LOG.debug("Proceed while waiting");
sendContent(exchange, content, contentCallback);
sendContent(exchange, content, contentCallback); // TODO old style usage!
}
else
{
@ -610,16 +613,17 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
{
case SENDING:
{
// TODO should just call contentCallback.iterate() here.
// We have content to send ?
if (content.advance())
{
sendContent(exchange, content, contentCallback);
sendContent(exchange, content, contentCallback); // TODO old style usage!
}
else
{
if (content.isConsumed())
{
sendContent(exchange, content, lastCallback);
sendContent(exchange, content, lastCallback);
}
else
{
@ -664,11 +668,11 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
private class ContentCallback extends IteratingCallback
{
@Override
protected boolean process() throws Exception
protected State process() throws Exception
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return false;
return State.IDLE;
Request request = exchange.getRequest();
HttpContent content = HttpSender.this.content;
@ -677,56 +681,54 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
if (contentBuffer != null)
{
if (!someToContent(request, contentBuffer))
return false;
return State.IDLE;
}
if (content.advance())
{
// There is more content to send
sendContent(exchange, content, this);
return State.SCHEDULED;
}
else
if (content.isConsumed())
{
if (content.isConsumed())
sendContent(exchange, content, lastCallback);
return State.SCHEDULED;
}
while (true)
{
SenderState current = senderState.get();
switch (current)
{
sendContent(exchange, content, lastCallback);
}
else
{
while (true)
case SENDING:
{
SenderState current = senderState.get();
switch (current)
if (updateSenderState(current, SenderState.IDLE))
{
case SENDING:
{
if (updateSenderState(current, SenderState.IDLE))
{
LOG.debug("Waiting for deferred content for {}", request);
return false;
}
break;
}
case SCHEDULED:
{
if (updateSenderState(current, SenderState.SENDING))
{
LOG.debug("Deferred content available for {}", request);
// TODO: this case is not covered by tests
sendContent(exchange, content, this);
return false;
}
break;
}
default:
{
throw new IllegalStateException();
}
LOG.debug("Waiting for deferred content for {}", request);
return State.IDLE;
}
break;
}
case SCHEDULED:
{
if (updateSenderState(current, SenderState.SENDING))
{
LOG.debug("Deferred content available for {}", request);
// TODO: this case is not covered by tests
sendContent(exchange, content, this);
return State.SCHEDULED;
}
break;
}
default:
{
throw new IllegalStateException();
}
}
}
return false;
}
@Override

View File

@ -482,7 +482,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
@Override
public boolean process() throws Exception
public State process() throws Exception
{
ByteBuffer chunk = _chunk;
while (true)
@ -558,7 +558,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
else
continue;
return false;
return State.SCHEDULED;
}
case SHUTDOWN_OUT:
{
@ -573,7 +573,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
if (!_lastContent || _content==null || !_content.hasArray() || !_header.hasArray() || _content.array()!=_header.array())
_bufferPool.release(_header);
}
return true;
return State.SUCCEEDED;
}
case CONTINUE:
{
@ -601,7 +601,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
@Override
public boolean process() throws Exception
public State process() throws Exception
{
ByteBuffer chunk = _chunk;
while (true)
@ -646,7 +646,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
else
continue;
return false;
return State.SCHEDULED;
}
case SHUTDOWN_OUT:
{
@ -655,7 +655,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
case DONE:
{
return true;
return State.SUCCEEDED;
}
case CONTINUE:
{

View File

@ -71,8 +71,8 @@ public class HttpOutput extends ServletOutputStream implements Runnable
isReady() OPEN:true READY:true READY:true UNREADY:false UNREADY:false
write completed - - - ASYNC READY->owp
*/
enum State { OPEN, ASYNC, READY, PENDING, UNREADY, CLOSED }
private final AtomicReference<State> _state=new AtomicReference<>(State.OPEN);
enum OutputState { OPEN, ASYNC, READY, PENDING, UNREADY, CLOSED }
private final AtomicReference<OutputState> _state=new AtomicReference<>(OutputState.OPEN);
public HttpOutput(HttpChannel<?> channel)
{
@ -99,7 +99,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
public void reopen()
{
_state.set(State.OPEN);
_state.set(OutputState.OPEN);
}
public boolean isAllContentWritten()
@ -110,10 +110,10 @@ public class HttpOutput extends ServletOutputStream implements Runnable
@Override
public void close()
{
State state=_state.get();
while(state!=State.CLOSED)
OutputState state=_state.get();
while(state!=OutputState.CLOSED)
{
if (_state.compareAndSet(state,State.CLOSED))
if (_state.compareAndSet(state,OutputState.CLOSED))
{
try
{
@ -137,10 +137,10 @@ public class HttpOutput extends ServletOutputStream implements Runnable
/* Called to indicated that the output is already closed and the state needs to be updated to match */
void closed()
{
State state=_state.get();
while(state!=State.CLOSED)
OutputState state=_state.get();
while(state!=OutputState.CLOSED)
{
if (_state.compareAndSet(state,State.CLOSED))
if (_state.compareAndSet(state,OutputState.CLOSED))
{
try
{
@ -169,7 +169,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
public boolean isClosed()
{
return _state.get()==State.CLOSED;
return _state.get()==OutputState.CLOSED;
}
@Override
@ -190,9 +190,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
throw new IllegalStateException("isReady() not called");
case READY:
if (!_state.compareAndSet(State.READY, State.PENDING))
if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
continue;
new AsyncFlush().process();
new AsyncFlush().iterate();
return;
case PENDING:
@ -226,7 +226,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
throw new IllegalStateException("isReady() not called");
case READY:
if (!_state.compareAndSet(State.READY, State.PENDING))
if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
continue;
// Should we aggregate?
@ -242,7 +242,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
// return if we are not complete, not full and filled all the content
if (filled==len && !BufferUtil.isFull(_aggregate))
{
if (!_state.compareAndSet(State.PENDING, State.ASYNC))
if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
throw new IllegalStateException();
return;
}
@ -253,7 +253,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
}
// Do the asynchronous writing from the callback
new AsyncWrite(b,off,len,complete).process();
new AsyncWrite(b,off,len,complete).iterate();
return;
case PENDING:
@ -333,11 +333,11 @@ public class HttpOutput extends ServletOutputStream implements Runnable
throw new IllegalStateException("isReady() not called");
case READY:
if (!_state.compareAndSet(State.READY, State.PENDING))
if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
continue;
// Do the asynchronous writing from the callback
new AsyncWrite(buffer,complete).process();
new AsyncWrite(buffer,complete).iterate();
return;
case PENDING:
@ -399,7 +399,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
throw new IllegalStateException("isReady() not called");
case READY:
if (!_state.compareAndSet(State.READY, State.PENDING))
if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
continue;
if (_aggregate == null)
@ -409,13 +409,13 @@ public class HttpOutput extends ServletOutputStream implements Runnable
// Check if all written or full
if (!complete && !BufferUtil.isFull(_aggregate))
{
if (!_state.compareAndSet(State.PENDING, State.ASYNC))
if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
throw new IllegalStateException();
return;
}
// Do the asynchronous writing from the callback
new AsyncFlush().process();
new AsyncFlush().iterate();
return;
case PENDING:
@ -556,7 +556,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
switch(_state.get())
{
case OPEN:
if (!_state.compareAndSet(State.OPEN, State.PENDING))
if (!_state.compareAndSet(OutputState.OPEN, OutputState.PENDING))
continue;
break;
case CLOSED:
@ -617,7 +617,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
if (!_channel.getState().isAsync())
throw new IllegalStateException("!ASYNC");
if (_state.compareAndSet(State.OPEN, State.READY))
if (_state.compareAndSet(OutputState.OPEN, OutputState.READY))
{
_writeListener = writeListener;
_channel.getState().onWritePossible();
@ -639,13 +639,13 @@ public class HttpOutput extends ServletOutputStream implements Runnable
case OPEN:
return true;
case ASYNC:
if (!_state.compareAndSet(State.ASYNC, State.READY))
if (!_state.compareAndSet(OutputState.ASYNC, OutputState.READY))
continue;
return true;
case READY:
return true;
case PENDING:
if (!_state.compareAndSet(State.PENDING, State.UNREADY))
if (!_state.compareAndSet(OutputState.PENDING, OutputState.UNREADY))
continue;
return false;
case UNREADY:
@ -666,7 +666,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_writeListener.onError(th);
close();
}
if (_state.get()==State.READY)
if (_state.get()==OutputState.READY)
{
try
{
@ -701,13 +701,13 @@ public class HttpOutput extends ServletOutputStream implements Runnable
}
@Override
protected boolean process()
protected State process()
{
// flush any content from the aggregate
if (BufferUtil.hasContent(_aggregate))
{
_channel.write(_aggregate, _complete && _len==0, this);
return false;
return State.SCHEDULED;
}
if (!_complete && _len<BufferUtil.space(_aggregate) && _len<_commitSize)
@ -718,18 +718,18 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{
_flushed=true;
_channel.write(_buffer, _complete, this);
return false;
return State.SCHEDULED;
}
else if (_len==0 && !_flushed)
{
_flushed=true;
_channel.write(BufferUtil.EMPTY_BUFFER, _complete, this);
return false;
return State.SCHEDULED;
}
if (_complete)
closed();
return true;
return State.SUCCEEDED;
}
}
@ -742,23 +742,23 @@ public class HttpOutput extends ServletOutputStream implements Runnable
}
@Override
protected boolean process()
protected State process()
{
if (BufferUtil.hasContent(_aggregate))
{
_flushed=true;
_channel.write(_aggregate, false, this);
return false;
return State.SCHEDULED;
}
if (!_flushed)
{
_flushed=true;
_channel.write(BufferUtil.EMPTY_BUFFER,false,this);
return false;
return State.SCHEDULED;
}
return true;
return State.SUCCEEDED;
}
@Override
@ -768,16 +768,16 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{
while(true)
{
State last=_state.get();
HttpOutput.OutputState last=_state.get();
switch(last)
{
case PENDING:
if (!_state.compareAndSet(State.PENDING, State.ASYNC))
if (!_state.compareAndSet(HttpOutput.OutputState.PENDING, HttpOutput.OutputState.ASYNC))
continue;
break;
case UNREADY:
if (!_state.compareAndSet(State.UNREADY, State.READY))
if (!_state.compareAndSet(HttpOutput.OutputState.UNREADY, HttpOutput.OutputState.READY))
continue;
_channel.getState().onWritePossible();
break;
@ -833,7 +833,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
}
@Override
protected boolean process() throws Exception
protected State process() throws Exception
{
// Only return if EOF has previously been read and thus
// a write done with EOF=true
@ -843,7 +843,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_in.close();
closed();
_channel.getByteBufferPool().release(_buffer);
return true;
return State.SUCCEEDED;
}
// Read until buffer full or EOF
@ -861,8 +861,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_buffer.position(0);
_buffer.limit(len);
_channel.write(_buffer,_eof,this);
return false;
return State.SCHEDULED;
}
@Override
@ -905,7 +904,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
}
@Override
protected boolean process() throws Exception
protected State process() throws Exception
{
// Only return if EOF has previously been read and thus
// a write done with EOF=true
@ -914,7 +913,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_in.close();
closed();
_channel.getByteBufferPool().release(_buffer);
return true;
return State.SUCCEEDED;
}
// Read from stream until buffer full or EOF
@ -926,7 +925,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_buffer.flip();
_channel.write(_buffer,_eof,this);
return false;
return State.SCHEDULED;
}
@Override

View File

@ -121,10 +121,10 @@ public class SPDYConnection extends AbstractConnection implements Controller, Id
}
@Override
public void write(ByteBuffer buffer, final Callback callback)
public void write(final Callback callback, ByteBuffer... buffers)
{
EndPoint endPoint = getEndPoint();
endPoint.write(callback, buffer);
endPoint.write(callback, buffers);
}
@Override

View File

@ -24,7 +24,7 @@ import org.eclipse.jetty.util.Callback;
public interface Controller
{
public void write(ByteBuffer buffer, Callback callback);
public void write(Callback callback, ByteBuffer... buffers);
public void close(boolean onlyOutput);
}

View File

@ -21,13 +21,14 @@ package org.eclipse.jetty.spdy;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.eclipse.jetty.spdy.StandardSession.FrameBytes;
import org.eclipse.jetty.spdy.api.SPDYException;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamStatus;
import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -35,14 +36,13 @@ import org.eclipse.jetty.util.log.Logger;
public class Flusher
{
private static final Logger LOG = Log.getLogger(Flusher.class);
private static final int MAX_GATHER = 10;
private final IteratingCallback iteratingCallback = new SessionIteratingCallback();
private final Controller controller;
private final LinkedList<StandardSession.FrameBytes> queue = new LinkedList<>();
private final Object lock = new Object();
private final ArrayQueue<StandardSession.FrameBytes> queue = new ArrayQueue<>(lock);
private Throwable failure;
private StandardSession.FrameBytes active;
private boolean flushing;
public Flusher(Controller controller)
{
@ -51,7 +51,7 @@ public class Flusher
void removeFrameBytesFromQueue(Stream stream)
{
synchronized (queue)
synchronized (lock)
{
for (StandardSession.FrameBytes frameBytes : queue)
if (frameBytes.getStream() == stream)
@ -62,21 +62,21 @@ public class Flusher
void append(StandardSession.FrameBytes frameBytes)
{
Throwable failure;
synchronized (queue)
synchronized (lock)
{
failure = this.failure;
if (failure == null)
{
// Frames containing headers must be send in the order the headers have been generated. We don't need
// to do this check in StandardSession.prepend() as no frames containing headers will be prepended.
// Control frames are added in order
if (frameBytes instanceof StandardSession.ControlFrameBytes)
queue.addLast(frameBytes);
queue.add(frameBytes);
else
{
// Otherwise scan from the back of the queue to insert by priority
int index = queue.size();
while (index > 0)
{
StandardSession.FrameBytes element = queue.get(index - 1);
StandardSession.FrameBytes element = queue.getUnsafe(index - 1);
if (element.compareTo(frameBytes) >= 0)
break;
--index;
@ -85,8 +85,10 @@ public class Flusher
}
}
}
// If no failures make sure we are iterating
if (failure == null)
iteratingCallback.iterate();
flush();
else
frameBytes.failed(new SPDYException(failure));
}
@ -94,15 +96,17 @@ public class Flusher
void prepend(StandardSession.FrameBytes frameBytes)
{
Throwable failure;
synchronized (queue)
synchronized (lock)
{
failure = this.failure;
if (failure == null)
{
// Scan from the front of the queue looking to skip higher priority messages
int index = 0;
while (index < queue.size())
int size=queue.size();
while (index < size)
{
StandardSession.FrameBytes element = queue.get(index);
StandardSession.FrameBytes element = queue.getUnsafe(index);
if (element.compareTo(frameBytes) <= 0)
break;
++index;
@ -111,97 +115,105 @@ public class Flusher
}
}
// If no failures make sure we are iterating
if (failure == null)
iteratingCallback.iterate();
flush();
else
frameBytes.failed(new SPDYException(failure));
}
void flush()
{
StandardSession.FrameBytes frameBytes = null;
ByteBuffer buffer = null;
boolean failFrameBytes = false;
synchronized (queue)
{
if (flushing || queue.isEmpty())
return;
Set<IStream> stalledStreams = null;
for (int i = 0; i < queue.size(); ++i)
{
frameBytes = queue.get(i);
IStream stream = frameBytes.getStream();
if (stream != null && stalledStreams != null && stalledStreams.contains(stream))
continue;
buffer = frameBytes.getByteBuffer();
if (buffer != null)
{
queue.remove(i);
if (stream != null && stream.isReset() && !(frameBytes instanceof StandardSession
.ControlFrameBytes))
failFrameBytes = true;
break;
}
if (stalledStreams == null)
stalledStreams = new HashSet<>();
if (stream != null)
stalledStreams.add(stream);
LOG.debug("Flush stalled for {}, {} frame(s) in queue", frameBytes, queue.size());
}
if (buffer == null)
return;
if (!failFrameBytes)
{
flushing = true;
LOG.debug("Flushing {}, {} frame(s) in queue", frameBytes, queue.size());
}
}
if (failFrameBytes)
{
frameBytes.failed(new StreamException(frameBytes.getStream().getId(), StreamStatus.INVALID_STREAM,
"Stream: " + frameBytes.getStream() + " is reset!"));
}
else
{
write(buffer, frameBytes);
}
}
private void write(ByteBuffer buffer, StandardSession.FrameBytes frameBytes)
{
active = frameBytes;
if (controller != null)
{
LOG.debug("Writing {} frame bytes of {}", buffer.remaining(), buffer.limit());
controller.write(buffer, iteratingCallback);
}
iteratingCallback.iterate();
}
public int getQueueSize()
{
return queue.size();
synchronized (lock)
{
return queue.size();
}
}
private class SessionIteratingCallback extends IteratingCallback
{
private final List<StandardSession.FrameBytes> active = new ArrayList<>();
private final Set<IStream> stalled = new HashSet<>();
@Override
protected boolean process() throws Exception
protected State process() throws Exception
{
flush();
return false;
StandardSession.FrameBytes frameBytes = null;
synchronized (lock)
{
if (active.size()>0)
throw new IllegalStateException();
if (queue.isEmpty())
return State.IDLE;
// Scan queue for data to write from first non stalled stream.
int qs=queue.size();
for (int i = 0; i < qs && active.size()<MAX_GATHER;)
{
frameBytes = queue.getUnsafe(i);
IStream stream = frameBytes.getStream();
// Continue if this is stalled stream
if (stream!=null)
{
if (stalled.size()>0 && stalled.contains(stream))
{
i++;
continue;
}
if (stream.getWindowSize()<=0)
{
stalled.add(stream);
i++;
continue;
}
}
// we will be writing this one, so take the frame off the queue
queue.remove(i);
qs--;
// Has the stream been reset and if this not a control frame?
if (stream != null && stream.isReset() && !(frameBytes instanceof StandardSession.ControlFrameBytes))
{
frameBytes.failed(new StreamException(frameBytes.getStream().getId(), StreamStatus.INVALID_STREAM,
"Stream: " + frameBytes.getStream() + " is reset!"));
continue;
}
active.add(frameBytes);
}
stalled.clear();
if (LOG.isDebugEnabled())
LOG.debug("Flushing {} of {} frame(s) in queue", active.size(), queue.size());
}
if (active.size() == 0)
return State.IDLE;
// Get the bytes to write
ByteBuffer[] buffers = new ByteBuffer[active.size()];
for (int i=0;i<buffers.length;i++)
buffers[i]=active.get(i).getByteBuffer();
if (controller != null)
controller.write(iteratingCallback, buffers);
return State.SCHEDULED;
}
@Override
protected void completed()
{
// will never be called as process always returns false!
// will never be called as doProcess always returns WAITING or IDLE
throw new IllegalStateException();
}
@Override
@ -209,16 +221,14 @@ public class Flusher
{
if (LOG.isDebugEnabled())
{
synchronized (queue)
synchronized (lock)
{
LOG.debug("Completed write of {}, {} frame(s) in queue", active, queue.size());
}
}
active.succeeded();
synchronized (queue)
{
flushing = false;
}
for (FrameBytes frame: active)
frame.succeeded();
active.clear();
super.succeeded();
}
@ -227,7 +237,7 @@ public class Flusher
{
List<StandardSession.FrameBytes> frameBytesToFail = new ArrayList<>();
synchronized (queue)
synchronized (lock)
{
failure = x;
if (LOG.isDebugEnabled())
@ -239,7 +249,9 @@ public class Flusher
queue.clear();
}
active.failed(x);
for (FrameBytes frame: active)
frame.failed(x);
active.clear();
for (StandardSession.FrameBytes fb : frameBytesToFail)
fb.failed(x);
super.failed(x);

View File

@ -1162,8 +1162,6 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
{
IStream stream = getStream();
int windowSize = stream.getWindowSize();
if (windowSize <= 0)
return null;
size = dataInfo.available();
if (size > windowSize)

View File

@ -146,7 +146,7 @@ public class AsyncTimeoutTest
private static class TestController implements Controller
{
@Override
public void write(ByteBuffer buffer, Callback callback)
public void write(Callback callback, ByteBuffer... buffers)
{
callback.succeeded();
}

View File

@ -132,7 +132,7 @@ public class StandardSessionTest
return null;
}
})
.when(controller).write(any(ByteBuffer.class), any(Callback.class));
.when(controller).write(any(Callback.class), any(ByteBuffer.class));
}
@Test
@ -451,7 +451,7 @@ public class StandardSessionTest
// second data frame should fail without controller.write() as the connection is expected to be broken after first controller.write() call failed.
stream.data(new StringDataInfo(5, TimeUnit.SECONDS, "data", false), callback);
verify(controller, times(1)).write(any(ByteBuffer.class), any(Callback.class));
verify(controller, times(1)).write(any(Callback.class), any(ByteBuffer.class));
assertThat("Callback.failed has been called twice", failedCalledLatch.await(5, TimeUnit.SECONDS), is(true));
}
@ -465,7 +465,7 @@ public class StandardSessionTest
session.rst(new RstInfo(stream.getId(), StreamStatus.INVALID_STREAM));
stream.headers(new HeadersInfo(headers, true));
verify(controller, times(3)).write(any(ByteBuffer.class), any(Callback.class));
verify(controller, times(3)).write(any(Callback.class), any(ByteBuffer.class));
}
@Test
@ -548,7 +548,7 @@ public class StandardSessionTest
long lastStreamId = 0;
@Override
public void write(ByteBuffer buffer, Callback callback)
public void write(Callback callback, ByteBuffer... buffers)
{
StandardSession.FrameBytes frameBytes = (StandardSession.FrameBytes)callback;

View File

@ -41,31 +41,37 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public abstract class IteratingCallback implements Callback
{
private enum State { WAITING, ITERATING, SUCCEEDED, FAILED };
private final AtomicReference<State> _state = new AtomicReference<>(State.WAITING);
protected enum State { IDLE, SCHEDULED, ITERATING, SUCCEEDED, FAILED };
private final AtomicReference<State> _state = new AtomicReference<>(State.IDLE);
public IteratingCallback()
{
}
/* ------------------------------------------------------------ */
abstract protected void completed();
/**
* Process a subtask.
* <p>Called by {@link #iterate()} to process a sub task of the overall task
* <p>
* @return True if the total task is complete. If false is returned
* then this Callback must be scheduled to receive either a call to
* {@link #succeeded()} or {@link #failed(Throwable)}.
* Method called by iterate to process the task.
* @return Then next state:
* <dl>
* <dt>SUCCEEDED</dt><dd>if process returns true</dd>
* <dt>SCHEDULED</dt><dd>This callback has been scheduled and {@link #succeeded()} or {@link #failed(Throwable)} will evenutally be called (if they have not been called already!)</dd>
* <dt>IDLE</dt><dd>no progress can be made and another call to {@link #iterate()} is required in order to progress the task</dd>
* <dt>FAILED</dt><dd>processing has failed</dd>
* </dl>
*
* @throws Exception
*/
abstract protected boolean process() throws Exception;
abstract protected void completed();
abstract protected State process() throws Exception;
/* ------------------------------------------------------------ */
/** This method is called initially to start processing and
* is then called by subsequent sub task success to continue
* processing.
* processing. If {@link #process()} returns IDLE, then iterate should be called
* again to restart processing.
* It is safe to call iterate multiple times as only the first thread to move
* the state out of IDLE will actually do any iteration and processing.
*/
public void iterate()
{
@ -74,27 +80,40 @@ public abstract class IteratingCallback implements Callback
// Keep iterating as long as succeeded() is called during process()
// If we are in WAITING state, either this is the first iteration or
// succeeded()/failed() were called already.
while(_state.compareAndSet(State.WAITING,State.ITERATING))
while(_state.compareAndSet(State.IDLE,State.ITERATING))
{
// Make some progress by calling process()
if (process())
State next = process();
switch (next)
{
// A true return indicates we are finished and no further callbacks
// are scheduled. So we must still be ITERATING.
if (_state.compareAndSet(State.ITERATING,State.SUCCEEDED))
case SUCCEEDED:
// The task has complete, there should have been no callbacks
if (!_state.compareAndSet(State.ITERATING,State.SUCCEEDED))
throw new IllegalStateException("state="+_state.get());
completed();
else
throw new IllegalStateException("Already "+_state.get());
return;
return;
case SCHEDULED:
// This callback has been scheduled, so it may or may not have
// already been called back. Let's find out
if (_state.compareAndSet(State.ITERATING,State.SCHEDULED))
// not called back yet, so lets wait for it
return;
// call back must have happened, so lets iterate
continue;
case IDLE:
// No more progress can be made. Wait for another call to iterate
if (!_state.compareAndSet(State.ITERATING,State.IDLE))
throw new IllegalStateException("state="+_state.get());
return;
case FAILED:
_state.set(State.FAILED);
return;
default:
throw new IllegalStateException("state="+_state.get()+" next="+next);
}
// else a callback has been scheduled. If it has not happened yet,
// we will still be ITERATING
else if (_state.compareAndSet(State.ITERATING,State.WAITING))
// no callback yet, so break the loop and wait for it
break;
// The callback must have happened and we are either WAITING already or FAILED
// the loop test will work out which
}
}
catch(Exception e)
@ -108,7 +127,7 @@ public abstract class IteratingCallback implements Callback
public void succeeded()
{
// Try a short cut for the fast method. If we are still iterating
if (_state.compareAndSet(State.ITERATING,State.WAITING))
if (_state.compareAndSet(State.ITERATING,State.IDLE))
// then next loop will continue processing, so nothing to do here
return;
@ -118,17 +137,22 @@ public abstract class IteratingCallback implements Callback
switch(_state.get())
{
case ITERATING:
if (_state.compareAndSet(State.ITERATING,State.WAITING))
if (_state.compareAndSet(State.ITERATING,State.IDLE))
break loop;
continue;
case WAITING:
// we are really waiting, so use this callback thread to iterate some more
case SCHEDULED:
if (_state.compareAndSet(State.SCHEDULED,State.IDLE))
iterate();
break loop;
case IDLE:
// TODO - remove this once old ICB usages updated
iterate();
break loop;
default:
throw new IllegalStateException("Already "+_state.get());
throw new IllegalStateException(this+" state="+_state.get());
}
}
}
@ -151,14 +175,35 @@ public abstract class IteratingCallback implements Callback
break loop;
continue;
case WAITING:
if (_state.compareAndSet(State.WAITING,State.FAILED))
case SCHEDULED:
if (_state.compareAndSet(State.SCHEDULED,State.FAILED))
break loop;
continue;
case IDLE:
// TODO - remove this once old ICB usages updated
if (_state.compareAndSet(State.IDLE,State.FAILED))
break loop;
continue;
default:
throw new IllegalStateException("Already "+_state.get(),x);
throw new IllegalStateException("state="+_state.get(),x);
}
}
}
public boolean isIdle()
{
return _state.get()==State.IDLE;
}
public boolean isFailed()
{
return _state.get()==State.FAILED;
}
public boolean isSucceeded()
{
return _state.get()==State.SUCCEEDED;
}
}

View File

@ -0,0 +1,302 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.eclipse.jetty.util.thread.Scheduler;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
public class IteratingCallbackTest
{
static Scheduler scheduler = new ScheduledExecutorScheduler();
@BeforeClass
public static void beforeClass() throws Exception
{
scheduler.start();
}
@AfterClass
public static void afterClass() throws Exception
{
scheduler.stop();
}
@Test
public void testNonWaitingProcess() throws Exception
{
TestCB cb=new TestCB()
{
int i=10;
@Override
protected State process() throws Exception
{
processed++;
if (i-->1)
{
succeeded(); // fake a completed IO operation
return State.SCHEDULED;
}
return State.SUCCEEDED;
}
};
cb.iterate();
Assert.assertTrue(cb.waitForComplete());
Assert.assertEquals(10,cb.processed);
}
@Test
public void testWaitingProcess() throws Exception
{
TestCB cb=new TestCB()
{
int i=4;
@Override
protected State process() throws Exception
{
processed++;
if (i-->1)
{
scheduler.schedule(successTask,50,TimeUnit.MILLISECONDS);
return State.SCHEDULED;
}
return State.SUCCEEDED;
}
};
cb.iterate();
Assert.assertTrue(cb.waitForComplete());
Assert.assertEquals(4,cb.processed);
}
@Test
public void testWaitingProcessSpuriousInterate() throws Exception
{
final TestCB cb=new TestCB()
{
int i=4;
@Override
protected State process() throws Exception
{
processed++;
if (i-->1)
{
scheduler.schedule(successTask,50,TimeUnit.MILLISECONDS);
return State.SCHEDULED;
}
return State.SUCCEEDED;
}
};
cb.iterate();
scheduler.schedule(new Runnable()
{
@Override
public void run()
{
cb.iterate();
if (!cb.isSucceeded())
scheduler.schedule(this,50,TimeUnit.MILLISECONDS);
}
},49,TimeUnit.MILLISECONDS);
Assert.assertTrue(cb.waitForComplete());
Assert.assertEquals(4,cb.processed);
}
@Test
public void testNonWaitingProcessFailure() throws Exception
{
TestCB cb=new TestCB()
{
int i=10;
@Override
protected State process() throws Exception
{
processed++;
if (i-->1)
{
if (i>5)
succeeded(); // fake a completed IO operation
else
failed(new Exception("testing"));
return State.SCHEDULED;
}
return State.SUCCEEDED;
}
};
cb.iterate();
Assert.assertFalse(cb.waitForComplete());
Assert.assertEquals(5,cb.processed);
}
@Test
public void testWaitingProcessFailure() throws Exception
{
TestCB cb=new TestCB()
{
int i=4;
@Override
protected State process() throws Exception
{
processed++;
if (i-->1)
{
scheduler.schedule(i>2?successTask:failTask,50,TimeUnit.MILLISECONDS);
return State.SCHEDULED;
}
return State.SUCCEEDED;
}
};
cb.iterate();
Assert.assertFalse(cb.waitForComplete());
Assert.assertEquals(2,cb.processed);
}
@Test
public void testIdleWaiting() throws Exception
{
final CountDownLatch idle = new CountDownLatch(1);
TestCB cb=new TestCB()
{
int i=5;
@Override
protected State process()
{
processed++;
switch(i--)
{
case 5:
succeeded();
return State.SCHEDULED;
case 4:
scheduler.schedule(successTask,5,TimeUnit.MILLISECONDS);
return State.SCHEDULED;
case 3:
scheduler.schedule(new Runnable()
{
@Override
public void run()
{
idle.countDown();
}
},5,TimeUnit.MILLISECONDS);
return State.IDLE;
case 2:
succeeded();
return State.SCHEDULED;
case 1:
scheduler.schedule(successTask,5,TimeUnit.MILLISECONDS);
return State.SCHEDULED;
case 0:
return State.SUCCEEDED;
default:
throw new IllegalStateException();
}
}
};
cb.iterate();
idle.await(10,TimeUnit.SECONDS);
Assert.assertTrue(cb.isIdle());
cb.iterate();
Assert.assertTrue(cb.waitForComplete());
Assert.assertEquals(6,cb.processed);
}
private abstract static class TestCB extends IteratingCallback
{
CountDownLatch completed = new CountDownLatch(1);
int processed=0;
@Override
protected void completed()
{
completed.countDown();
}
@Override
public void failed(Throwable x)
{
super.failed(x);
completed.countDown();
}
boolean waitForComplete() throws InterruptedException
{
completed.await(10,TimeUnit.SECONDS);
return isSucceeded();
}
Runnable successTask = new Runnable()
{
@Override
public void run()
{
succeeded();
}
};
Runnable failTask = new Runnable()
{
@Override
public void run()
{
failed(new Exception("testing failure"));
}
};
}
}

View File

@ -564,7 +564,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
LOG.debug("outgoingFrame({}, {})",frame,callback);
}
writeBytes.enqueue(frame,WriteCallbackWrapper.wrap(callback));
writeBytes.enqueue(frame,callback);
flush();
}

View File

@ -32,6 +32,7 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.OpCode;
@ -42,15 +43,15 @@ import org.eclipse.jetty.websocket.common.frames.DataFrame;
*/
public class WriteBytesProvider implements Callback
{
private class FrameEntry
private class FrameEntry
{
protected final AtomicBoolean failed = new AtomicBoolean(false);
protected final Frame frame;
protected final Callback callback;
protected final WriteCallback callback;
/** holds reference to header ByteBuffer, as it needs to be released on success/failure */
private ByteBuffer headerBuffer;
public FrameEntry(Frame frame, Callback callback)
public FrameEntry(Frame frame, WriteCallback callback)
{
this.frame = frame;
this.callback = callback;
@ -69,7 +70,7 @@ public class WriteBytesProvider implements Callback
return generator.getPayloadWindow(bufferSize,frame);
}
public void notifyFailure(Throwable t)
public void notifyFailed(Throwable t)
{
freeBuffers();
if (failed.getAndSet(true) == false)
@ -87,7 +88,7 @@ public class WriteBytesProvider implements Callback
}
try
{
callback.succeeded();
callback.writeSuccess();
}
catch (Throwable t)
{
@ -166,7 +167,7 @@ public class WriteBytesProvider implements Callback
failAll(new EOFException("Connection has been disconnected"));
}
public void enqueue(Frame frame, Callback callback)
public void enqueue(Frame frame, WriteCallback callback)
{
Objects.requireNonNull(frame);
LOG.debug("enqueue({}, {})",frame,callback);
@ -178,7 +179,7 @@ public class WriteBytesProvider implements Callback
LOG.debug("Write is closed: {} {}",frame,callback);
if (callback != null)
{
callback.failed(new IOException("Write is closed"));
callback.writeFailed(new IOException("Write is closed"));
}
return;
}
@ -240,7 +241,7 @@ public class WriteBytesProvider implements Callback
// notify entry callbacks
for (FrameEntry entry : callbacks)
{
entry.notifyFailure(t);
entry.notifyFailed(t);
}
}
}
@ -328,7 +329,7 @@ public class WriteBytesProvider implements Callback
}
}
private void notifySafeFailure(Callback callback, Throwable t)
private void notifySafeFailure(WriteCallback callback, Throwable t)
{
if (callback == null)
{
@ -336,7 +337,7 @@ public class WriteBytesProvider implements Callback
}
try
{
callback.failed(t);
callback.writeFailed(t);
}
catch (Throwable e)
{

View File

@ -21,11 +21,12 @@ package org.eclipse.jetty.websocket.common.io;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.api.WriteCallback;
/**
* Tracking Callback for testing how the callbacks are used.
*/
public class TrackingCallback implements Callback
public class TrackingCallback implements Callback, WriteCallback
{
private AtomicInteger called = new AtomicInteger();
private boolean success = false;
@ -65,4 +66,16 @@ public class TrackingCallback implements Callback
{
return called.get();
}
@Override
public void writeFailed(Throwable x)
{
failed(x);
}
@Override
public void writeSuccess()
{
succeeded();
}
}