From 5d83a43cce1f71ddcf8fb654b77bd02e6474c6c2 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Thu, 13 Mar 2014 21:12:55 +1100 Subject: [PATCH] 430242 - added SharedBlockingCallback to support threadsafe blocking --- .../org/eclipse/jetty/server/HttpChannel.java | 9 +- .../jetty/server/HttpInputOverHTTP.java | 10 +- .../org/eclipse/jetty/server/HttpOutput.java | 48 +-- .../eclipse/jetty/util/BlockingCallback.java | 25 +- .../jetty/util/SharedBlockingCallback.java | 303 ++++++++++-------- .../util/SharedBlockingCallbackTest.java | 125 ++++---- .../common/BlockingWriteCallback.java | 36 ++- .../common/WebSocketRemoteEndpoint.java | 15 +- .../common/message/MessageOutputStream.java | 8 +- .../common/message/MessageWriter.java | 8 +- 10 files changed, 314 insertions(+), 273 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index 82db0178f32..fbd34cebad3 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -53,6 +53,7 @@ import org.eclipse.jetty.util.BlockingCallback; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.URIUtil; +import org.eclipse.jetty.util.SharedBlockingCallback.Blocker; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.Scheduler; @@ -728,10 +729,10 @@ public class HttpChannel implements HttpParser.RequestHandler, Runnable protected boolean sendResponse(ResponseInfo info, ByteBuffer content, boolean complete) throws IOException { - BlockingCallback writeBlock = _response.getHttpOutput().acquireWriteBlockingCallback(); - boolean committing=sendResponse(info,content,complete,writeBlock); - writeBlock.block(); - return committing; + try(Blocker blocker = _response.getHttpOutput().acquireWriteBlockingCallback()) + { + return sendResponse(info,content,complete,blocker); + } } public boolean isCommitted() diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java index b799b0c83e0..35943376b58 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.SharedBlockingCallback; +import org.eclipse.jetty.util.SharedBlockingCallback.Blocker; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -57,10 +58,11 @@ public class HttpInputOverHTTP extends HttpInput implements Callback { while(true) { - _readBlocker.acquire(); - _httpConnection.fillInterested(_readBlocker); - LOG.debug("{} block readable on {}",this,_readBlocker); - _readBlocker.block(); + try (Blocker blocker=_readBlocker.acquire()) + { + _httpConnection.fillInterested(blocker); + LOG.debug("{} block readable on {}",this,blocker); + } Object content=getNextContent(); if (content!=null || isFinished()) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index 77ca11f5b19..8d36745ca04 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -35,6 +35,7 @@ import org.eclipse.jetty.http.HttpContent; import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.util.BlockingCallback; import org.eclipse.jetty.util.SharedBlockingCallback; +import org.eclipse.jetty.util.SharedBlockingCallback.Blocker; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IteratingCallback; @@ -116,17 +117,17 @@ public class HttpOutput extends ServletOutputStream implements Runnable return _channel.getResponse().isAllContentWritten(_written); } - protected BlockingCallback acquireWriteBlockingCallback() throws IOException + protected Blocker acquireWriteBlockingCallback() throws IOException { - _writeblock.acquire(); - return _writeblock; + return _writeblock.acquire(); } protected void write(ByteBuffer content, boolean complete) throws IOException { - _writeblock.acquire(); - write(content,complete,_writeblock); - _writeblock.block(); + try (Blocker blocker=_writeblock.acquire()) + { + write(content,complete,blocker); + } } protected void write(ByteBuffer content, boolean complete, Callback callback) @@ -439,9 +440,10 @@ public class HttpOutput extends ServletOutputStream implements Runnable // Check if all written or full if (complete || BufferUtil.isFull(_aggregate)) { - _writeblock.acquire(); - write(_aggregate, complete, _writeblock); - _writeblock.block(); + try(Blocker blocker=_writeblock.acquire()) + { + write(_aggregate, complete, blocker); + } if (complete) closed(); } @@ -497,9 +499,10 @@ public class HttpOutput extends ServletOutputStream implements Runnable */ public void sendContent(ByteBuffer content) throws IOException { - _writeblock.acquire(); - write(content,true,_writeblock); - _writeblock.block(); + try(Blocker blocker=_writeblock.acquire()) + { + write(content,true,blocker); + } } /* ------------------------------------------------------------ */ @@ -509,9 +512,10 @@ public class HttpOutput extends ServletOutputStream implements Runnable */ public void sendContent(InputStream in) throws IOException { - _writeblock.acquire(); - new InputStreamWritingCB(in,_writeblock).iterate(); - _writeblock.block(); + try(Blocker blocker=_writeblock.acquire()) + { + new InputStreamWritingCB(in,blocker).iterate(); + } } /* ------------------------------------------------------------ */ @@ -521,9 +525,10 @@ public class HttpOutput extends ServletOutputStream implements Runnable */ public void sendContent(ReadableByteChannel in) throws IOException { - _writeblock.acquire(); - new ReadableByteChannelWritingCB(in,_writeblock).iterate(); - _writeblock.block(); + try(Blocker blocker=_writeblock.acquire()) + { + new ReadableByteChannelWritingCB(in,blocker).iterate(); + } } @@ -534,9 +539,10 @@ public class HttpOutput extends ServletOutputStream implements Runnable */ public void sendContent(HttpContent content) throws IOException { - _writeblock.acquire(); - sendContent(content,_writeblock); - _writeblock.block(); + try(Blocker blocker=_writeblock.acquire()) + { + sendContent(content,blocker); + } } /* ------------------------------------------------------------ */ diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/BlockingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/BlockingCallback.java index d55836d3302..dfb4faea306 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/BlockingCallback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/BlockingCallback.java @@ -26,30 +26,7 @@ import java.util.concurrent.atomic.AtomicReference; /* ------------------------------------------------------------ */ /** - * A Callback for simple reusable conversion of an - * asynchronous API to blocking. - *

- * To avoid late redundant calls to {@link #succeeded()} or {@link #failed(Throwable)} from - * interfering with later reuses of this class, the callback context is used to hold pass a phase indicated - * and only a single callback per phase is allowed. - *

- * A typical usage pattern is: - *

- * public class MyClass
- * {
- *     BlockingCallback cb = new BlockingCallback();
- *     
- *     public void blockingMethod(Object args) throws Exception
- *     {
- *         asyncMethod(args,cb);
- *         cb.block();
- *     }
- *     
- *     public void asyncMethod(Object args, Callback callback)
- *     {
- *         ...
- *     }
- *  }
+ * TODO
  */
 public class BlockingCallback implements Callback
 {
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java
index 0ad55c24065..02bb82b1d60 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java
@@ -18,174 +18,203 @@
 
 package org.eclipse.jetty.util;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.concurrent.CancellationException;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
+
 /* ------------------------------------------------------------ */
-/**
- * A Callback for simple reusable conversion of an 
- * asynchronous API to blocking.
- * 

- * To avoid late redundant calls to {@link #succeeded()} or {@link #failed(Throwable)} from - * interfering with later reuses of this class, the callback context is used to hold pass a phase indicated - * and only a single callback per phase is allowed. - *

+/** Provides a reusable BlockingCallback. * A typical usage pattern is: *

- * public class MyClass
+ * void someBlockingCall(Object... args) throws IOException
  * {
- *     BlockingCallback cb = new BlockingCallback();
- *     
- *     public void blockingMethod(Object args) throws Exception
- *     {
- *         asyncMethod(args,cb);
- *         cb.block();
- *     }
- *     
- *     public void asyncMethod(Object args, Callback callback)
- *     {
- *         ...
- *     }
- *  }
+ *   try(Blocker blocker=sharedBlockingCallback.acquire())
+ *   {
+ *     someAsyncCall(args,blocker);
+ *   }
+ *   catch(Throwable e)
+ *   {
+ *     blocker.fail(e);
+ *   }
+ * }
+ * 
*/ -public class SharedBlockingCallback extends BlockingCallback +public class SharedBlockingCallback { - private static Throwable IDLE=new Throwable() + private static Throwable IDLE = new Throwable() { @Override - public String toString() { return "IDLE"; } + public String toString() + { + return "IDLE"; + } }; - - private static Throwable SUCCEEDED=new Throwable() - { - @Override - public String toString() { return "SUCCEEDED"; } - }; - - final ReentrantLock _lock = new ReentrantLock(); - Condition _idle = _lock.newCondition(); - Condition _complete = _lock.newCondition(); - Throwable _state = IDLE; - + private static Throwable SUCCEEDED = new Throwable() + { + @Override + public String toString() + { + return "SUCCEEDED"; + } + }; + + final Blocker _blocker; public SharedBlockingCallback() - {} - - public void acquire() throws IOException { - _lock.lock(); + this(new Blocker()); + } + + protected SharedBlockingCallback(Blocker blocker) + { + _blocker=blocker; + } + + public Blocker acquire() throws IOException + { + _blocker._lock.lock(); try { - while (_state!=IDLE) - _idle.await(); - _state=null; + while (_blocker._state != IDLE) + _blocker._idle.await(); + _blocker._state = null; } catch (final InterruptedException e) { - throw new InterruptedIOException(){{initCause(e);}}; + throw new InterruptedIOException() + { + { + initCause(e); + } + }; } finally { - _lock.unlock(); + _blocker._lock.unlock(); } + return _blocker; } + - @Override - public void succeeded() - { - _lock.lock(); - try - { - if (_state==null) - { - _state=SUCCEEDED; - _complete.signalAll(); - } - else if (_state==IDLE) - throw new IllegalStateException("IDLE"); - } - finally - { - _lock.unlock(); - } - } - - @Override - public void failed(Throwable cause) - { - _lock.lock(); - try - { - if (_state==null) - { - _state=cause; - _complete.signalAll(); - } - else if (_state==IDLE) - throw new IllegalStateException("IDLE"); - } - finally - { - _lock.unlock(); - } - } - - /** Block until the Callback has succeeded or failed and - * after the return leave in the state to allow reuse. - * This is useful for code that wants to repeatable use a FutureCallback to convert - * an asynchronous API to a blocking API. - * @throws IOException if exception was caught during blocking, or callback was cancelled + /* ------------------------------------------------------------ */ + /** A Closeable Callback. + * Uses the auto close mechanism to block until the collback is complete. */ - @Override - public void block() throws IOException + public static class Blocker implements Callback, Closeable { - _lock.lock(); - try - { - while (_state==null) - _complete.await(); - - if (_state==SUCCEEDED) - return; - if (_state==IDLE) - throw new IllegalStateException("IDLE"); - if (_state instanceof IOException) - throw (IOException) _state; - if (_state instanceof CancellationException) - throw (CancellationException) _state; - throw new IOException(_state); - } - catch (final InterruptedException e) - { - throw new InterruptedIOException(){{initCause(e);}}; - } - finally - { - _state=IDLE; - _idle.signalAll(); - _lock.unlock(); - } - } - - - @Override - public String toString() - { - _lock.lock(); - try - { - return String.format("%s@%x{%s}",SharedBlockingCallback.class.getSimpleName(),hashCode(),_state); - } - finally - { - _lock.unlock(); - } - } + final ReentrantLock _lock = new ReentrantLock(); + final Condition _idle = _lock.newCondition(); + final Condition _complete = _lock.newCondition(); + Throwable _state = IDLE; + public Blocker() + { + } + + @Override + public void succeeded() + { + _lock.lock(); + try + { + if (_state == null) + { + _state = SUCCEEDED; + _complete.signalAll(); + } + else if (_state == IDLE) + throw new IllegalStateException("IDLE"); + } + finally + { + _lock.unlock(); + } + } + + @Override + public void failed(Throwable cause) + { + _lock.lock(); + try + { + if (_state == null) + { + _state = cause; + _complete.signalAll(); + } + else if (_state == IDLE) + throw new IllegalStateException("IDLE"); + } + finally + { + _lock.unlock(); + } + } + + /** + * Block until the Callback has succeeded or failed and after the return leave in the state to allow reuse. This is useful for code that wants to + * repeatable use a FutureCallback to convert an asynchronous API to a blocking API. + * + * @throws IOException + * if exception was caught during blocking, or callback was cancelled + */ + @Override + public void close() throws IOException + { + _lock.lock(); + try + { + while (_state == null) + _complete.await(); + + if (_state == SUCCEEDED) + return; + if (_state == IDLE) + throw new IllegalStateException("IDLE"); + if (_state instanceof IOException) + throw (IOException)_state; + if (_state instanceof CancellationException) + throw (CancellationException)_state; + if (_state instanceof RuntimeException) + throw (RuntimeException)_state; + if (_state instanceof Error) + throw (Error)_state; + throw new IOException(_state); + } + catch (final InterruptedException e) + { + throw new InterruptedIOException() + { + { + initCause(e); + } + }; + } + finally + { + _state = IDLE; + _idle.signalAll(); + _lock.unlock(); + } + } + + @Override + public String toString() + { + _lock.lock(); + try + { + return String.format("%s@%x{%s}",SharedBlockingCallback.class.getSimpleName(),hashCode(),_state); + } + finally + { + _lock.unlock(); + } + } + } } diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java index 43f9a693c63..1d4894ad92e 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java @@ -19,22 +19,17 @@ package org.eclipse.jetty.util; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.eclipse.jetty.util.SharedBlockingCallback.Blocker; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; public class SharedBlockingCallbackTest { - final SharedBlockingCallback fcb= new SharedBlockingCallback(); + final SharedBlockingCallback sbcb= new SharedBlockingCallback(); public SharedBlockingCallbackTest() { @@ -43,34 +38,38 @@ public class SharedBlockingCallbackTest @Test public void testDone() throws Exception - { - fcb.acquire(); - fcb.succeeded(); - long start=System.currentTimeMillis(); - fcb.block(); + { + long start; + try (Blocker blocker=sbcb.acquire()) + { + blocker.succeeded(); + start=System.currentTimeMillis(); + } Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L)); } @Test public void testGetDone() throws Exception { - fcb.acquire(); - final CountDownLatch latch = new CountDownLatch(1); - - new Thread(new Runnable() + long start; + try (final Blocker blocker=sbcb.acquire()) { - @Override - public void run() + final CountDownLatch latch = new CountDownLatch(1); + + new Thread(new Runnable() { - latch.countDown(); - try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();} - fcb.succeeded(); - } - }).start(); - - latch.await(); - long start=System.currentTimeMillis(); - fcb.block(); + @Override + public void run() + { + latch.countDown(); + try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();} + blocker.succeeded(); + } + }).start(); + + latch.await(); + start=System.currentTimeMillis(); + } Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L)); Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(1000L)); } @@ -78,18 +77,19 @@ public class SharedBlockingCallbackTest @Test public void testFailed() throws Exception { - fcb.acquire(); - Exception ex=new Exception("FAILED"); - fcb.failed(ex); - - long start=System.currentTimeMillis(); + final Exception ex = new Exception("FAILED"); + long start=Long.MIN_VALUE; try { - fcb.block(); + try (final Blocker blocker=sbcb.acquire()) + { + blocker.failed(ex); + } Assert.fail(); } catch(IOException ee) { + start=System.currentTimeMillis(); Assert.assertEquals(ex,ee.getCause()); } Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(100L)); @@ -98,26 +98,29 @@ public class SharedBlockingCallbackTest @Test public void testGetFailed() throws Exception { - fcb.acquire(); - final Exception ex=new Exception("FAILED"); + final Exception ex = new Exception("FAILED"); + long start=Long.MIN_VALUE; final CountDownLatch latch = new CountDownLatch(1); - - new Thread(new Runnable() - { - @Override - public void run() - { - latch.countDown(); - try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();} - fcb.failed(ex); - } - }).start(); - - latch.await(); - long start=System.currentTimeMillis(); + try { - fcb.block(); + try (final Blocker blocker=sbcb.acquire()) + { + + new Thread(new Runnable() + { + @Override + public void run() + { + latch.countDown(); + try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();} + blocker.failed(ex); + } + }).start(); + + latch.await(); + start=System.currentTimeMillis(); + } Assert.fail(); } catch(IOException ee) @@ -141,11 +144,12 @@ public class SharedBlockingCallbackTest { try { - fcb.acquire(); - latch.countDown(); - TimeUnit.MILLISECONDS.sleep(100); - fcb.succeeded(); - fcb.block(); + try (Blocker blocker=sbcb.acquire()) + { + latch.countDown(); + TimeUnit.MILLISECONDS.sleep(100); + blocker.succeeded(); + } } catch(Exception e) { @@ -157,12 +161,13 @@ public class SharedBlockingCallbackTest latch.await(); long start=System.currentTimeMillis(); - fcb.acquire(); - Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L)); - Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L)); + try (Blocker blocker=sbcb.acquire()) + { + Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L)); + Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L)); - fcb.succeeded(); - fcb.block(); + blocker.succeeded(); + }; Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(600L)); } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/BlockingWriteCallback.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/BlockingWriteCallback.java index efda63020e9..4fdc68072fc 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/BlockingWriteCallback.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/BlockingWriteCallback.java @@ -18,23 +18,39 @@ package org.eclipse.jetty.websocket.common; +import java.io.IOException; + import org.eclipse.jetty.util.SharedBlockingCallback; import org.eclipse.jetty.websocket.api.WriteCallback; -public class BlockingWriteCallback extends SharedBlockingCallback implements WriteCallback + +/* ------------------------------------------------------------ */ +/** extend a SharedlBlockingCallback to an websocket WriteCallback + */ +public class BlockingWriteCallback extends SharedBlockingCallback { public BlockingWriteCallback() - {} - - @Override - public void writeFailed(Throwable x) { - failed(x); + super(new WriteBlocker()); } - - @Override - public void writeSuccess() + + public WriteBlocker acquireWriteBlocker() throws IOException { - succeeded(); + return (WriteBlocker)acquire(); + } + + public static class WriteBlocker extends Blocker implements WriteCallback + { + @Override + public void writeFailed(Throwable x) + { + failed(x); + } + + @Override + public void writeSuccess() + { + succeeded(); + } } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java index 146641bb4e0..e0193a8986f 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java @@ -32,6 +32,7 @@ import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; +import org.eclipse.jetty.websocket.common.BlockingWriteCallback.WriteBlocker; import org.eclipse.jetty.websocket.common.frames.BinaryFrame; import org.eclipse.jetty.websocket.common.frames.ContinuationFrame; import org.eclipse.jetty.websocket.common.frames.DataFrame; @@ -100,9 +101,10 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint private void blockingWrite(WebSocketFrame frame) throws IOException { - blocker.acquire(); - uncheckedSendFrame(frame, blocker); - blocker.block(); + try(WriteBlocker b=blocker.acquireWriteBlocker()) + { + uncheckedSendFrame(frame, b); + } } private boolean lockMsg(MsgType type) @@ -441,14 +443,13 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint this.batchMode = batchMode; } + @Override public void flush() throws IOException { lockMsg(MsgType.ASYNC); - try + try (WriteBlocker b = blocker.acquireWriteBlocker()) { - blocker.acquire(); - uncheckedSendFrame(FrameFlusher.FLUSH_FRAME, blocker); - blocker.block(); + uncheckedSendFrame(FrameFlusher.FLUSH_FRAME, b); } finally { diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java index e1bb6b14902..b9249cfe396 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java @@ -31,6 +31,7 @@ import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; import org.eclipse.jetty.websocket.common.BlockingWriteCallback; import org.eclipse.jetty.websocket.common.WebSocketSession; +import org.eclipse.jetty.websocket.common.BlockingWriteCallback.WriteBlocker; import org.eclipse.jetty.websocket.common.frames.BinaryFrame; /** @@ -142,9 +143,10 @@ public class MessageOutputStream extends OutputStream frame.setPayload(buffer); frame.setFin(fin); - blocker.acquire(); - outgoing.outgoingFrame(frame, blocker, BatchMode.OFF); - blocker.block(); + try(WriteBlocker b=blocker.acquireWriteBlocker()) + { + outgoing.outgoingFrame(frame, b, BatchMode.OFF); + } ++frameCount; // Any flush after the first will be a CONTINUATION frame. diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java index 8d691217d3d..655bf386cb6 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java @@ -31,6 +31,7 @@ import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; import org.eclipse.jetty.websocket.common.BlockingWriteCallback; import org.eclipse.jetty.websocket.common.WebSocketSession; +import org.eclipse.jetty.websocket.common.BlockingWriteCallback.WriteBlocker; import org.eclipse.jetty.websocket.common.frames.TextFrame; /** @@ -146,9 +147,10 @@ public class MessageWriter extends Writer frame.setPayload(data); frame.setFin(fin); - blocker.acquire(); - outgoing.outgoingFrame(frame, blocker, BatchMode.OFF); - blocker.block(); + try (WriteBlocker b = blocker.acquireWriteBlocker()) + { + outgoing.outgoingFrame(frame, b, BatchMode.OFF); + } ++frameCount; // Any flush after the first will be a CONTINUATION frame.