430242 - added SharedBlockingCallback to support threadsafe blocking

This commit is contained in:
Greg Wilkins 2014-03-13 21:12:55 +11:00
parent 0330f509e6
commit 5d83a43cce
10 changed files with 314 additions and 273 deletions

View File

@ -53,6 +53,7 @@ import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.URIUtil;
import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
@ -728,10 +729,10 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
protected boolean sendResponse(ResponseInfo info, ByteBuffer content, boolean complete) throws IOException
{
BlockingCallback writeBlock = _response.getHttpOutput().acquireWriteBlockingCallback();
boolean committing=sendResponse(info,content,complete,writeBlock);
writeBlock.block();
return committing;
try(Blocker blocker = _response.getHttpOutput().acquireWriteBlockingCallback())
{
return sendResponse(info,content,complete,blocker);
}
}
public boolean isCommitted()

View File

@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -57,10 +58,11 @@ public class HttpInputOverHTTP extends HttpInput<ByteBuffer> implements Callback
{
while(true)
{
_readBlocker.acquire();
_httpConnection.fillInterested(_readBlocker);
LOG.debug("{} block readable on {}",this,_readBlocker);
_readBlocker.block();
try (Blocker blocker=_readBlocker.acquire())
{
_httpConnection.fillInterested(blocker);
LOG.debug("{} block readable on {}",this,blocker);
}
Object content=getNextContent();
if (content!=null || isFinished())

View File

@ -35,6 +35,7 @@ import org.eclipse.jetty.http.HttpContent;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
@ -116,17 +117,17 @@ public class HttpOutput extends ServletOutputStream implements Runnable
return _channel.getResponse().isAllContentWritten(_written);
}
protected BlockingCallback acquireWriteBlockingCallback() throws IOException
protected Blocker acquireWriteBlockingCallback() throws IOException
{
_writeblock.acquire();
return _writeblock;
return _writeblock.acquire();
}
protected void write(ByteBuffer content, boolean complete) throws IOException
{
_writeblock.acquire();
write(content,complete,_writeblock);
_writeblock.block();
try (Blocker blocker=_writeblock.acquire())
{
write(content,complete,blocker);
}
}
protected void write(ByteBuffer content, boolean complete, Callback callback)
@ -439,9 +440,10 @@ public class HttpOutput extends ServletOutputStream implements Runnable
// Check if all written or full
if (complete || BufferUtil.isFull(_aggregate))
{
_writeblock.acquire();
write(_aggregate, complete, _writeblock);
_writeblock.block();
try(Blocker blocker=_writeblock.acquire())
{
write(_aggregate, complete, blocker);
}
if (complete)
closed();
}
@ -497,9 +499,10 @@ public class HttpOutput extends ServletOutputStream implements Runnable
*/
public void sendContent(ByteBuffer content) throws IOException
{
_writeblock.acquire();
write(content,true,_writeblock);
_writeblock.block();
try(Blocker blocker=_writeblock.acquire())
{
write(content,true,blocker);
}
}
/* ------------------------------------------------------------ */
@ -509,9 +512,10 @@ public class HttpOutput extends ServletOutputStream implements Runnable
*/
public void sendContent(InputStream in) throws IOException
{
_writeblock.acquire();
new InputStreamWritingCB(in,_writeblock).iterate();
_writeblock.block();
try(Blocker blocker=_writeblock.acquire())
{
new InputStreamWritingCB(in,blocker).iterate();
}
}
/* ------------------------------------------------------------ */
@ -521,9 +525,10 @@ public class HttpOutput extends ServletOutputStream implements Runnable
*/
public void sendContent(ReadableByteChannel in) throws IOException
{
_writeblock.acquire();
new ReadableByteChannelWritingCB(in,_writeblock).iterate();
_writeblock.block();
try(Blocker blocker=_writeblock.acquire())
{
new ReadableByteChannelWritingCB(in,blocker).iterate();
}
}
@ -534,9 +539,10 @@ public class HttpOutput extends ServletOutputStream implements Runnable
*/
public void sendContent(HttpContent content) throws IOException
{
_writeblock.acquire();
sendContent(content,_writeblock);
_writeblock.block();
try(Blocker blocker=_writeblock.acquire())
{
sendContent(content,blocker);
}
}
/* ------------------------------------------------------------ */

View File

@ -26,30 +26,7 @@ import java.util.concurrent.atomic.AtomicReference;
/* ------------------------------------------------------------ */
/**
* A Callback for simple reusable conversion of an
* asynchronous API to blocking.
* <p>
* To avoid late redundant calls to {@link #succeeded()} or {@link #failed(Throwable)} from
* interfering with later reuses of this class, the callback context is used to hold pass a phase indicated
* and only a single callback per phase is allowed.
* <p>
* A typical usage pattern is:
* <pre>
* public class MyClass
* {
* BlockingCallback cb = new BlockingCallback();
*
* public void blockingMethod(Object args) throws Exception
* {
* asyncMethod(args,cb);
* cb.block();
* }
*
* public <C>void asyncMethod(Object args, Callback callback)
* {
* ...
* }
* }
* TODO
*/
public class BlockingCallback implements Callback
{

View File

@ -18,174 +18,203 @@
package org.eclipse.jetty.util;
import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/* ------------------------------------------------------------ */
/**
* A Callback for simple reusable conversion of an
* asynchronous API to blocking.
* <p>
* To avoid late redundant calls to {@link #succeeded()} or {@link #failed(Throwable)} from
* interfering with later reuses of this class, the callback context is used to hold pass a phase indicated
* and only a single callback per phase is allowed.
* <p>
/** Provides a reusable BlockingCallback.
* A typical usage pattern is:
* <pre>
* public class MyClass
* void someBlockingCall(Object... args) throws IOException
* {
* BlockingCallback cb = new BlockingCallback();
*
* public void blockingMethod(Object args) throws Exception
* {
* asyncMethod(args,cb);
* cb.block();
* }
*
* public <C>void asyncMethod(Object args, Callback callback)
* {
* ...
* }
* }
* try(Blocker blocker=sharedBlockingCallback.acquire())
* {
* someAsyncCall(args,blocker);
* }
* catch(Throwable e)
* {
* blocker.fail(e);
* }
* }
* </pre>
*/
public class SharedBlockingCallback extends BlockingCallback
public class SharedBlockingCallback
{
private static Throwable IDLE=new Throwable()
private static Throwable IDLE = new Throwable()
{
@Override
public String toString() { return "IDLE"; }
public String toString()
{
return "IDLE";
}
};
private static Throwable SUCCEEDED=new Throwable()
{
@Override
public String toString() { return "SUCCEEDED"; }
};
final ReentrantLock _lock = new ReentrantLock();
Condition _idle = _lock.newCondition();
Condition _complete = _lock.newCondition();
Throwable _state = IDLE;
private static Throwable SUCCEEDED = new Throwable()
{
@Override
public String toString()
{
return "SUCCEEDED";
}
};
final Blocker _blocker;
public SharedBlockingCallback()
{}
public void acquire() throws IOException
{
_lock.lock();
this(new Blocker());
}
protected SharedBlockingCallback(Blocker blocker)
{
_blocker=blocker;
}
public Blocker acquire() throws IOException
{
_blocker._lock.lock();
try
{
while (_state!=IDLE)
_idle.await();
_state=null;
while (_blocker._state != IDLE)
_blocker._idle.await();
_blocker._state = null;
}
catch (final InterruptedException e)
{
throw new InterruptedIOException(){{initCause(e);}};
throw new InterruptedIOException()
{
{
initCause(e);
}
};
}
finally
{
_lock.unlock();
_blocker._lock.unlock();
}
return _blocker;
}
@Override
public void succeeded()
{
_lock.lock();
try
{
if (_state==null)
{
_state=SUCCEEDED;
_complete.signalAll();
}
else if (_state==IDLE)
throw new IllegalStateException("IDLE");
}
finally
{
_lock.unlock();
}
}
@Override
public void failed(Throwable cause)
{
_lock.lock();
try
{
if (_state==null)
{
_state=cause;
_complete.signalAll();
}
else if (_state==IDLE)
throw new IllegalStateException("IDLE");
}
finally
{
_lock.unlock();
}
}
/** Block until the Callback has succeeded or failed and
* after the return leave in the state to allow reuse.
* This is useful for code that wants to repeatable use a FutureCallback to convert
* an asynchronous API to a blocking API.
* @throws IOException if exception was caught during blocking, or callback was cancelled
/* ------------------------------------------------------------ */
/** A Closeable Callback.
* Uses the auto close mechanism to block until the collback is complete.
*/
@Override
public void block() throws IOException
public static class Blocker implements Callback, Closeable
{
_lock.lock();
try
{
while (_state==null)
_complete.await();
if (_state==SUCCEEDED)
return;
if (_state==IDLE)
throw new IllegalStateException("IDLE");
if (_state instanceof IOException)
throw (IOException) _state;
if (_state instanceof CancellationException)
throw (CancellationException) _state;
throw new IOException(_state);
}
catch (final InterruptedException e)
{
throw new InterruptedIOException(){{initCause(e);}};
}
finally
{
_state=IDLE;
_idle.signalAll();
_lock.unlock();
}
}
@Override
public String toString()
{
_lock.lock();
try
{
return String.format("%s@%x{%s}",SharedBlockingCallback.class.getSimpleName(),hashCode(),_state);
}
finally
{
_lock.unlock();
}
}
final ReentrantLock _lock = new ReentrantLock();
final Condition _idle = _lock.newCondition();
final Condition _complete = _lock.newCondition();
Throwable _state = IDLE;
public Blocker()
{
}
@Override
public void succeeded()
{
_lock.lock();
try
{
if (_state == null)
{
_state = SUCCEEDED;
_complete.signalAll();
}
else if (_state == IDLE)
throw new IllegalStateException("IDLE");
}
finally
{
_lock.unlock();
}
}
@Override
public void failed(Throwable cause)
{
_lock.lock();
try
{
if (_state == null)
{
_state = cause;
_complete.signalAll();
}
else if (_state == IDLE)
throw new IllegalStateException("IDLE");
}
finally
{
_lock.unlock();
}
}
/**
* Block until the Callback has succeeded or failed and after the return leave in the state to allow reuse. This is useful for code that wants to
* repeatable use a FutureCallback to convert an asynchronous API to a blocking API.
*
* @throws IOException
* if exception was caught during blocking, or callback was cancelled
*/
@Override
public void close() throws IOException
{
_lock.lock();
try
{
while (_state == null)
_complete.await();
if (_state == SUCCEEDED)
return;
if (_state == IDLE)
throw new IllegalStateException("IDLE");
if (_state instanceof IOException)
throw (IOException)_state;
if (_state instanceof CancellationException)
throw (CancellationException)_state;
if (_state instanceof RuntimeException)
throw (RuntimeException)_state;
if (_state instanceof Error)
throw (Error)_state;
throw new IOException(_state);
}
catch (final InterruptedException e)
{
throw new InterruptedIOException()
{
{
initCause(e);
}
};
}
finally
{
_state = IDLE;
_idle.signalAll();
_lock.unlock();
}
}
@Override
public String toString()
{
_lock.lock();
try
{
return String.format("%s@%x{%s}",SharedBlockingCallback.class.getSimpleName(),hashCode(),_state);
}
finally
{
_lock.unlock();
}
}
}
}

View File

@ -19,22 +19,17 @@
package org.eclipse.jetty.util;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
public class SharedBlockingCallbackTest
{
final SharedBlockingCallback fcb= new SharedBlockingCallback();
final SharedBlockingCallback sbcb= new SharedBlockingCallback();
public SharedBlockingCallbackTest()
{
@ -43,34 +38,38 @@ public class SharedBlockingCallbackTest
@Test
public void testDone() throws Exception
{
fcb.acquire();
fcb.succeeded();
long start=System.currentTimeMillis();
fcb.block();
{
long start;
try (Blocker blocker=sbcb.acquire())
{
blocker.succeeded();
start=System.currentTimeMillis();
}
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L));
}
@Test
public void testGetDone() throws Exception
{
fcb.acquire();
final CountDownLatch latch = new CountDownLatch(1);
new Thread(new Runnable()
long start;
try (final Blocker blocker=sbcb.acquire())
{
@Override
public void run()
final CountDownLatch latch = new CountDownLatch(1);
new Thread(new Runnable()
{
latch.countDown();
try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();}
fcb.succeeded();
}
}).start();
latch.await();
long start=System.currentTimeMillis();
fcb.block();
@Override
public void run()
{
latch.countDown();
try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();}
blocker.succeeded();
}
}).start();
latch.await();
start=System.currentTimeMillis();
}
Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L));
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(1000L));
}
@ -78,18 +77,19 @@ public class SharedBlockingCallbackTest
@Test
public void testFailed() throws Exception
{
fcb.acquire();
Exception ex=new Exception("FAILED");
fcb.failed(ex);
long start=System.currentTimeMillis();
final Exception ex = new Exception("FAILED");
long start=Long.MIN_VALUE;
try
{
fcb.block();
try (final Blocker blocker=sbcb.acquire())
{
blocker.failed(ex);
}
Assert.fail();
}
catch(IOException ee)
{
start=System.currentTimeMillis();
Assert.assertEquals(ex,ee.getCause());
}
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(100L));
@ -98,26 +98,29 @@ public class SharedBlockingCallbackTest
@Test
public void testGetFailed() throws Exception
{
fcb.acquire();
final Exception ex=new Exception("FAILED");
final Exception ex = new Exception("FAILED");
long start=Long.MIN_VALUE;
final CountDownLatch latch = new CountDownLatch(1);
new Thread(new Runnable()
{
@Override
public void run()
{
latch.countDown();
try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();}
fcb.failed(ex);
}
}).start();
latch.await();
long start=System.currentTimeMillis();
try
{
fcb.block();
try (final Blocker blocker=sbcb.acquire())
{
new Thread(new Runnable()
{
@Override
public void run()
{
latch.countDown();
try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();}
blocker.failed(ex);
}
}).start();
latch.await();
start=System.currentTimeMillis();
}
Assert.fail();
}
catch(IOException ee)
@ -141,11 +144,12 @@ public class SharedBlockingCallbackTest
{
try
{
fcb.acquire();
latch.countDown();
TimeUnit.MILLISECONDS.sleep(100);
fcb.succeeded();
fcb.block();
try (Blocker blocker=sbcb.acquire())
{
latch.countDown();
TimeUnit.MILLISECONDS.sleep(100);
blocker.succeeded();
}
}
catch(Exception e)
{
@ -157,12 +161,13 @@ public class SharedBlockingCallbackTest
latch.await();
long start=System.currentTimeMillis();
fcb.acquire();
Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L));
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L));
try (Blocker blocker=sbcb.acquire())
{
Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L));
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L));
fcb.succeeded();
fcb.block();
blocker.succeeded();
};
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(600L));
}
}

View File

@ -18,23 +18,39 @@
package org.eclipse.jetty.websocket.common;
import java.io.IOException;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.websocket.api.WriteCallback;
public class BlockingWriteCallback extends SharedBlockingCallback implements WriteCallback
/* ------------------------------------------------------------ */
/** extend a SharedlBlockingCallback to an websocket WriteCallback
*/
public class BlockingWriteCallback extends SharedBlockingCallback
{
public BlockingWriteCallback()
{}
@Override
public void writeFailed(Throwable x)
{
failed(x);
super(new WriteBlocker());
}
@Override
public void writeSuccess()
public WriteBlocker acquireWriteBlocker() throws IOException
{
succeeded();
return (WriteBlocker)acquire();
}
public static class WriteBlocker extends Blocker implements WriteCallback
{
@Override
public void writeFailed(Throwable x)
{
failed(x);
}
@Override
public void writeSuccess()
{
succeeded();
}
}
}

View File

@ -32,6 +32,7 @@ import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.BlockingWriteCallback.WriteBlocker;
import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
import org.eclipse.jetty.websocket.common.frames.ContinuationFrame;
import org.eclipse.jetty.websocket.common.frames.DataFrame;
@ -100,9 +101,10 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
private void blockingWrite(WebSocketFrame frame) throws IOException
{
blocker.acquire();
uncheckedSendFrame(frame, blocker);
blocker.block();
try(WriteBlocker b=blocker.acquireWriteBlocker())
{
uncheckedSendFrame(frame, b);
}
}
private boolean lockMsg(MsgType type)
@ -441,14 +443,13 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
this.batchMode = batchMode;
}
@Override
public void flush() throws IOException
{
lockMsg(MsgType.ASYNC);
try
try (WriteBlocker b = blocker.acquireWriteBlocker())
{
blocker.acquire();
uncheckedSendFrame(FrameFlusher.FLUSH_FRAME, blocker);
blocker.block();
uncheckedSendFrame(FrameFlusher.FLUSH_FRAME, b);
}
finally
{

View File

@ -31,6 +31,7 @@ import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.BlockingWriteCallback;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.BlockingWriteCallback.WriteBlocker;
import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
/**
@ -142,9 +143,10 @@ public class MessageOutputStream extends OutputStream
frame.setPayload(buffer);
frame.setFin(fin);
blocker.acquire();
outgoing.outgoingFrame(frame, blocker, BatchMode.OFF);
blocker.block();
try(WriteBlocker b=blocker.acquireWriteBlocker())
{
outgoing.outgoingFrame(frame, b, BatchMode.OFF);
}
++frameCount;
// Any flush after the first will be a CONTINUATION frame.

View File

@ -31,6 +31,7 @@ import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.BlockingWriteCallback;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.BlockingWriteCallback.WriteBlocker;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
/**
@ -146,9 +147,10 @@ public class MessageWriter extends Writer
frame.setPayload(data);
frame.setFin(fin);
blocker.acquire();
outgoing.outgoingFrame(frame, blocker, BatchMode.OFF);
blocker.block();
try (WriteBlocker b = blocker.acquireWriteBlocker())
{
outgoing.outgoingFrame(frame, b, BatchMode.OFF);
}
++frameCount;
// Any flush after the first will be a CONTINUATION frame.