From 710ea48244cd71d0e10a5a6e33857ec84a3734b7 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Sat, 22 Mar 2014 09:36:09 +1100 Subject: [PATCH] 430654 430242 - Closeable SharedBlockingCallback --- .../org/eclipse/jetty/server/HttpChannel.java | 4 +- .../jetty/server/HttpInputOverHTTP.java | 1 + .../org/eclipse/jetty/server/HttpOutput.java | 6 +++ .../eclipse/jetty/util/BlockingCallback.java | 2 +- .../jetty/util/SharedBlockingCallback.java | 39 +++++++++++++++---- .../util/SharedBlockingCallbackTest.java | 6 +++ .../common/WebSocketRemoteEndpoint.java | 2 + .../common/message/MessageOutputStream.java | 1 + .../common/message/MessageWriter.java | 1 + 9 files changed, 53 insertions(+), 9 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index fbd34cebad3..86f50696cb9 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -731,7 +731,9 @@ public class HttpChannel implements HttpParser.RequestHandler, Runnable { try(Blocker blocker = _response.getHttpOutput().acquireWriteBlockingCallback()) { - return sendResponse(info,content,complete,blocker); + boolean committing = sendResponse(info,content,complete,blocker); + blocker.block(); + return committing; } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java index 35943376b58..fda404c9130 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java @@ -62,6 +62,7 @@ public class HttpInputOverHTTP extends HttpInput implements Callback { _httpConnection.fillInterested(blocker); LOG.debug("{} block readable on {}",this,blocker); + blocker.block(); } Object content=getNextContent(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index 8d36745ca04..f22057bf701 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -127,6 +127,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable try (Blocker blocker=_writeblock.acquire()) { write(content,complete,blocker); + blocker.block(); } } @@ -443,6 +444,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable try(Blocker blocker=_writeblock.acquire()) { write(_aggregate, complete, blocker); + blocker.block(); } if (complete) closed(); @@ -502,6 +504,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable try(Blocker blocker=_writeblock.acquire()) { write(content,true,blocker); + blocker.block(); } } @@ -515,6 +518,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable try(Blocker blocker=_writeblock.acquire()) { new InputStreamWritingCB(in,blocker).iterate(); + blocker.block(); } } @@ -528,6 +532,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable try(Blocker blocker=_writeblock.acquire()) { new ReadableByteChannelWritingCB(in,blocker).iterate(); + blocker.block(); } } @@ -542,6 +547,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable try(Blocker blocker=_writeblock.acquire()) { sendContent(content,blocker); + blocker.block(); } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/BlockingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/BlockingCallback.java index dfb4faea306..94e465061a1 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/BlockingCallback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/BlockingCallback.java @@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicReference; /* ------------------------------------------------------------ */ /** - * TODO + * An implementation of Callback that blocks until success or failure. */ public class BlockingCallback implements Callback { diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java index 02bb82b1d60..f78cf776579 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java @@ -25,6 +25,9 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; + /* ------------------------------------------------------------ */ /** Provides a reusable BlockingCallback. @@ -35,16 +38,16 @@ import java.util.concurrent.locks.ReentrantLock; * try(Blocker blocker=sharedBlockingCallback.acquire()) * { * someAsyncCall(args,blocker); - * } - * catch(Throwable e) - * { - * blocker.fail(e); + * blocker.block(); * } * } * */ public class SharedBlockingCallback { + private static final Logger LOG = Log.getLogger(SharedBlockingCallback.class); + + private static Throwable IDLE = new Throwable() { @Override @@ -103,7 +106,7 @@ public class SharedBlockingCallback /* ------------------------------------------------------------ */ /** A Closeable Callback. - * Uses the auto close mechanism to block until the collback is complete. + * Uses the auto close mechanism to check block has been called OK. */ public static class Blocker implements Callback, Closeable { @@ -163,8 +166,7 @@ public class SharedBlockingCallback * @throws IOException * if exception was caught during blocking, or callback was cancelled */ - @Override - public void close() throws IOException + public void block() throws IOException { _lock.lock(); try @@ -196,6 +198,29 @@ public class SharedBlockingCallback }; } finally + { + _lock.unlock(); + } + } + + /** + * Check the Callback has succeeded or failed and after the return leave in the state to allow reuse. + * + * @throws IOException + * if exception was caught during blocking, or callback was cancelled + */ + @Override + public void close() throws IOException + { + _lock.lock(); + try + { + if (_state == IDLE) + throw new IllegalStateException("IDLE"); + if (_state == null) + LOG.warn(new Throwable()); + } + finally { _state = IDLE; _idle.signalAll(); diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java index 1d4894ad92e..4ee93a6b19b 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java @@ -44,6 +44,7 @@ public class SharedBlockingCallbackTest { blocker.succeeded(); start=System.currentTimeMillis(); + blocker.block(); } Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L)); } @@ -69,6 +70,7 @@ public class SharedBlockingCallbackTest latch.await(); start=System.currentTimeMillis(); + blocker.block(); } Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L)); Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(1000L)); @@ -84,6 +86,7 @@ public class SharedBlockingCallbackTest try (final Blocker blocker=sbcb.acquire()) { blocker.failed(ex); + blocker.block(); } Assert.fail(); } @@ -120,6 +123,7 @@ public class SharedBlockingCallbackTest latch.await(); start=System.currentTimeMillis(); + blocker.block(); } Assert.fail(); } @@ -149,6 +153,7 @@ public class SharedBlockingCallbackTest latch.countDown(); TimeUnit.MILLISECONDS.sleep(100); blocker.succeeded(); + blocker.block(); } } catch(Exception e) @@ -167,6 +172,7 @@ public class SharedBlockingCallbackTest Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L)); blocker.succeeded(); + blocker.block(); }; Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(600L)); } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java index e0193a8986f..a6625c2fb2d 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java @@ -104,6 +104,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint try(WriteBlocker b=blocker.acquireWriteBlocker()) { uncheckedSendFrame(frame, b); + b.block(); } } @@ -450,6 +451,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint try (WriteBlocker b = blocker.acquireWriteBlocker()) { uncheckedSendFrame(FrameFlusher.FLUSH_FRAME, b); + b.block(); } finally { diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java index b9249cfe396..6da8f3e9429 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java @@ -146,6 +146,7 @@ public class MessageOutputStream extends OutputStream try(WriteBlocker b=blocker.acquireWriteBlocker()) { outgoing.outgoingFrame(frame, b, BatchMode.OFF); + b.block(); } ++frameCount; diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java index 655bf386cb6..5afe710c1b8 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java @@ -150,6 +150,7 @@ public class MessageWriter extends Writer try (WriteBlocker b = blocker.acquireWriteBlocker()) { outgoing.outgoingFrame(frame, b, BatchMode.OFF); + b.block(); } ++frameCount;