430654 430242 - Closeable SharedBlockingCallback

This commit is contained in:
Greg Wilkins 2014-03-22 09:36:09 +11:00
parent 5d83a43cce
commit 710ea48244
9 changed files with 53 additions and 9 deletions

View File

@ -731,7 +731,9 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
{ {
try(Blocker blocker = _response.getHttpOutput().acquireWriteBlockingCallback()) try(Blocker blocker = _response.getHttpOutput().acquireWriteBlockingCallback())
{ {
return sendResponse(info,content,complete,blocker); boolean committing = sendResponse(info,content,complete,blocker);
blocker.block();
return committing;
} }
} }

View File

@ -62,6 +62,7 @@ public class HttpInputOverHTTP extends HttpInput<ByteBuffer> implements Callback
{ {
_httpConnection.fillInterested(blocker); _httpConnection.fillInterested(blocker);
LOG.debug("{} block readable on {}",this,blocker); LOG.debug("{} block readable on {}",this,blocker);
blocker.block();
} }
Object content=getNextContent(); Object content=getNextContent();

View File

@ -127,6 +127,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
try (Blocker blocker=_writeblock.acquire()) try (Blocker blocker=_writeblock.acquire())
{ {
write(content,complete,blocker); write(content,complete,blocker);
blocker.block();
} }
} }
@ -443,6 +444,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
try(Blocker blocker=_writeblock.acquire()) try(Blocker blocker=_writeblock.acquire())
{ {
write(_aggregate, complete, blocker); write(_aggregate, complete, blocker);
blocker.block();
} }
if (complete) if (complete)
closed(); closed();
@ -502,6 +504,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
try(Blocker blocker=_writeblock.acquire()) try(Blocker blocker=_writeblock.acquire())
{ {
write(content,true,blocker); write(content,true,blocker);
blocker.block();
} }
} }
@ -515,6 +518,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
try(Blocker blocker=_writeblock.acquire()) try(Blocker blocker=_writeblock.acquire())
{ {
new InputStreamWritingCB(in,blocker).iterate(); new InputStreamWritingCB(in,blocker).iterate();
blocker.block();
} }
} }
@ -528,6 +532,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
try(Blocker blocker=_writeblock.acquire()) try(Blocker blocker=_writeblock.acquire())
{ {
new ReadableByteChannelWritingCB(in,blocker).iterate(); new ReadableByteChannelWritingCB(in,blocker).iterate();
blocker.block();
} }
} }
@ -542,6 +547,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
try(Blocker blocker=_writeblock.acquire()) try(Blocker blocker=_writeblock.acquire())
{ {
sendContent(content,blocker); sendContent(content,blocker);
blocker.block();
} }
} }

View File

@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicReference;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
* TODO * An implementation of Callback that blocks until success or failure.
*/ */
public class BlockingCallback implements Callback public class BlockingCallback implements Callback
{ {

View File

@ -25,6 +25,9 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** Provides a reusable BlockingCallback. /** Provides a reusable BlockingCallback.
@ -35,16 +38,16 @@ import java.util.concurrent.locks.ReentrantLock;
* try(Blocker blocker=sharedBlockingCallback.acquire()) * try(Blocker blocker=sharedBlockingCallback.acquire())
* { * {
* someAsyncCall(args,blocker); * someAsyncCall(args,blocker);
* } * blocker.block();
* catch(Throwable e)
* {
* blocker.fail(e);
* } * }
* } * }
* </pre> * </pre>
*/ */
public class SharedBlockingCallback public class SharedBlockingCallback
{ {
private static final Logger LOG = Log.getLogger(SharedBlockingCallback.class);
private static Throwable IDLE = new Throwable() private static Throwable IDLE = new Throwable()
{ {
@Override @Override
@ -103,7 +106,7 @@ public class SharedBlockingCallback
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** A Closeable Callback. /** A Closeable Callback.
* Uses the auto close mechanism to block until the collback is complete. * Uses the auto close mechanism to check block has been called OK.
*/ */
public static class Blocker implements Callback, Closeable public static class Blocker implements Callback, Closeable
{ {
@ -163,8 +166,7 @@ public class SharedBlockingCallback
* @throws IOException * @throws IOException
* if exception was caught during blocking, or callback was cancelled * if exception was caught during blocking, or callback was cancelled
*/ */
@Override public void block() throws IOException
public void close() throws IOException
{ {
_lock.lock(); _lock.lock();
try try
@ -196,6 +198,29 @@ public class SharedBlockingCallback
}; };
} }
finally finally
{
_lock.unlock();
}
}
/**
* Check the Callback has succeeded or failed and after the return leave in the state to allow reuse.
*
* @throws IOException
* if exception was caught during blocking, or callback was cancelled
*/
@Override
public void close() throws IOException
{
_lock.lock();
try
{
if (_state == IDLE)
throw new IllegalStateException("IDLE");
if (_state == null)
LOG.warn(new Throwable());
}
finally
{ {
_state = IDLE; _state = IDLE;
_idle.signalAll(); _idle.signalAll();

View File

@ -44,6 +44,7 @@ public class SharedBlockingCallbackTest
{ {
blocker.succeeded(); blocker.succeeded();
start=System.currentTimeMillis(); start=System.currentTimeMillis();
blocker.block();
} }
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L)); Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L));
} }
@ -69,6 +70,7 @@ public class SharedBlockingCallbackTest
latch.await(); latch.await();
start=System.currentTimeMillis(); start=System.currentTimeMillis();
blocker.block();
} }
Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L)); Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L));
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(1000L)); Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(1000L));
@ -84,6 +86,7 @@ public class SharedBlockingCallbackTest
try (final Blocker blocker=sbcb.acquire()) try (final Blocker blocker=sbcb.acquire())
{ {
blocker.failed(ex); blocker.failed(ex);
blocker.block();
} }
Assert.fail(); Assert.fail();
} }
@ -120,6 +123,7 @@ public class SharedBlockingCallbackTest
latch.await(); latch.await();
start=System.currentTimeMillis(); start=System.currentTimeMillis();
blocker.block();
} }
Assert.fail(); Assert.fail();
} }
@ -149,6 +153,7 @@ public class SharedBlockingCallbackTest
latch.countDown(); latch.countDown();
TimeUnit.MILLISECONDS.sleep(100); TimeUnit.MILLISECONDS.sleep(100);
blocker.succeeded(); blocker.succeeded();
blocker.block();
} }
} }
catch(Exception e) catch(Exception e)
@ -167,6 +172,7 @@ public class SharedBlockingCallbackTest
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L)); Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L));
blocker.succeeded(); blocker.succeeded();
blocker.block();
}; };
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(600L)); Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(600L));
} }

View File

@ -104,6 +104,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
try(WriteBlocker b=blocker.acquireWriteBlocker()) try(WriteBlocker b=blocker.acquireWriteBlocker())
{ {
uncheckedSendFrame(frame, b); uncheckedSendFrame(frame, b);
b.block();
} }
} }
@ -450,6 +451,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
try (WriteBlocker b = blocker.acquireWriteBlocker()) try (WriteBlocker b = blocker.acquireWriteBlocker())
{ {
uncheckedSendFrame(FrameFlusher.FLUSH_FRAME, b); uncheckedSendFrame(FrameFlusher.FLUSH_FRAME, b);
b.block();
} }
finally finally
{ {

View File

@ -146,6 +146,7 @@ public class MessageOutputStream extends OutputStream
try(WriteBlocker b=blocker.acquireWriteBlocker()) try(WriteBlocker b=blocker.acquireWriteBlocker())
{ {
outgoing.outgoingFrame(frame, b, BatchMode.OFF); outgoing.outgoingFrame(frame, b, BatchMode.OFF);
b.block();
} }
++frameCount; ++frameCount;

View File

@ -150,6 +150,7 @@ public class MessageWriter extends Writer
try (WriteBlocker b = blocker.acquireWriteBlocker()) try (WriteBlocker b = blocker.acquireWriteBlocker())
{ {
outgoing.outgoingFrame(frame, b, BatchMode.OFF); outgoing.outgoingFrame(frame, b, BatchMode.OFF);
b.block();
} }
++frameCount; ++frameCount;