diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index fbd34cebad3..82db0178f32 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -53,7 +53,6 @@ import org.eclipse.jetty.util.BlockingCallback; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.URIUtil; -import org.eclipse.jetty.util.SharedBlockingCallback.Blocker; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.Scheduler; @@ -729,10 +728,10 @@ public class HttpChannel implements HttpParser.RequestHandler, Runnable protected boolean sendResponse(ResponseInfo info, ByteBuffer content, boolean complete) throws IOException { - try(Blocker blocker = _response.getHttpOutput().acquireWriteBlockingCallback()) - { - return sendResponse(info,content,complete,blocker); - } + BlockingCallback writeBlock = _response.getHttpOutput().acquireWriteBlockingCallback(); + boolean committing=sendResponse(info,content,complete,writeBlock); + writeBlock.block(); + return committing; } public boolean isCommitted() diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java index 35943376b58..b799b0c83e0 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java @@ -24,7 +24,6 @@ import java.nio.ByteBuffer; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.SharedBlockingCallback; -import org.eclipse.jetty.util.SharedBlockingCallback.Blocker; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -58,11 +57,10 @@ public class HttpInputOverHTTP extends HttpInput implements Callback { while(true) { - try (Blocker blocker=_readBlocker.acquire()) - { - _httpConnection.fillInterested(blocker); - LOG.debug("{} block readable on {}",this,blocker); - } + _readBlocker.acquire(); + _httpConnection.fillInterested(_readBlocker); + LOG.debug("{} block readable on {}",this,_readBlocker); + _readBlocker.block(); Object content=getNextContent(); if (content!=null || isFinished()) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index 8d36745ca04..77ca11f5b19 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -35,7 +35,6 @@ import org.eclipse.jetty.http.HttpContent; import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.util.BlockingCallback; import org.eclipse.jetty.util.SharedBlockingCallback; -import org.eclipse.jetty.util.SharedBlockingCallback.Blocker; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IteratingCallback; @@ -117,17 +116,17 @@ public class HttpOutput extends ServletOutputStream implements Runnable return _channel.getResponse().isAllContentWritten(_written); } - protected Blocker acquireWriteBlockingCallback() throws IOException + protected BlockingCallback acquireWriteBlockingCallback() throws IOException { - return _writeblock.acquire(); + _writeblock.acquire(); + return _writeblock; } protected void write(ByteBuffer content, boolean complete) throws IOException { - try (Blocker blocker=_writeblock.acquire()) - { - write(content,complete,blocker); - } + _writeblock.acquire(); + write(content,complete,_writeblock); + _writeblock.block(); } protected void write(ByteBuffer content, boolean complete, Callback callback) @@ -440,10 +439,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable // Check if all written or full if (complete || BufferUtil.isFull(_aggregate)) { - try(Blocker blocker=_writeblock.acquire()) - { - write(_aggregate, complete, blocker); - } + _writeblock.acquire(); + write(_aggregate, complete, _writeblock); + _writeblock.block(); if (complete) closed(); } @@ -499,10 +497,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable */ public void sendContent(ByteBuffer content) throws IOException { - try(Blocker blocker=_writeblock.acquire()) - { - write(content,true,blocker); - } + _writeblock.acquire(); + write(content,true,_writeblock); + _writeblock.block(); } /* ------------------------------------------------------------ */ @@ -512,10 +509,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable */ public void sendContent(InputStream in) throws IOException { - try(Blocker blocker=_writeblock.acquire()) - { - new InputStreamWritingCB(in,blocker).iterate(); - } + _writeblock.acquire(); + new InputStreamWritingCB(in,_writeblock).iterate(); + _writeblock.block(); } /* ------------------------------------------------------------ */ @@ -525,10 +521,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable */ public void sendContent(ReadableByteChannel in) throws IOException { - try(Blocker blocker=_writeblock.acquire()) - { - new ReadableByteChannelWritingCB(in,blocker).iterate(); - } + _writeblock.acquire(); + new ReadableByteChannelWritingCB(in,_writeblock).iterate(); + _writeblock.block(); } @@ -539,10 +534,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable */ public void sendContent(HttpContent content) throws IOException { - try(Blocker blocker=_writeblock.acquire()) - { - sendContent(content,blocker); - } + _writeblock.acquire(); + sendContent(content,_writeblock); + _writeblock.block(); } /* ------------------------------------------------------------ */ diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/BlockingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/BlockingCallback.java index dfb4faea306..d55836d3302 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/BlockingCallback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/BlockingCallback.java @@ -26,7 +26,30 @@ import java.util.concurrent.atomic.AtomicReference; /* ------------------------------------------------------------ */ /** - * TODO + * A Callback for simple reusable conversion of an + * asynchronous API to blocking. + *

+ * To avoid late redundant calls to {@link #succeeded()} or {@link #failed(Throwable)} from + * interfering with later reuses of this class, the callback context is used to hold pass a phase indicated + * and only a single callback per phase is allowed. + *

+ * A typical usage pattern is: + *

+ * public class MyClass
+ * {
+ *     BlockingCallback cb = new BlockingCallback();
+ *     
+ *     public void blockingMethod(Object args) throws Exception
+ *     {
+ *         asyncMethod(args,cb);
+ *         cb.block();
+ *     }
+ *     
+ *     public void asyncMethod(Object args, Callback callback)
+ *     {
+ *         ...
+ *     }
+ *  }
  */
 public class BlockingCallback implements Callback
 {
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java
index 02bb82b1d60..0ad55c24065 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java
@@ -18,203 +18,174 @@
 
 package org.eclipse.jetty.util;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
-
 /* ------------------------------------------------------------ */
-/** Provides a reusable BlockingCallback.
+/**
+ * A Callback for simple reusable conversion of an 
+ * asynchronous API to blocking.
+ * 

+ * To avoid late redundant calls to {@link #succeeded()} or {@link #failed(Throwable)} from + * interfering with later reuses of this class, the callback context is used to hold pass a phase indicated + * and only a single callback per phase is allowed. + *

* A typical usage pattern is: *

- * void someBlockingCall(Object... args) throws IOException
+ * public class MyClass
  * {
- *   try(Blocker blocker=sharedBlockingCallback.acquire())
- *   {
- *     someAsyncCall(args,blocker);
- *   }
- *   catch(Throwable e)
- *   {
- *     blocker.fail(e);
- *   }
- * }
- * 
+ * BlockingCallback cb = new BlockingCallback(); + * + * public void blockingMethod(Object args) throws Exception + * { + * asyncMethod(args,cb); + * cb.block(); + * } + * + * public void asyncMethod(Object args, Callback callback) + * { + * ... + * } + * } */ -public class SharedBlockingCallback +public class SharedBlockingCallback extends BlockingCallback { - private static Throwable IDLE = new Throwable() + private static Throwable IDLE=new Throwable() { @Override - public String toString() - { - return "IDLE"; - } + public String toString() { return "IDLE"; } }; - - private static Throwable SUCCEEDED = new Throwable() + + private static Throwable SUCCEEDED=new Throwable() { @Override - public String toString() - { - return "SUCCEEDED"; - } + public String toString() { return "SUCCEEDED"; } }; + - final Blocker _blocker; + final ReentrantLock _lock = new ReentrantLock(); + Condition _idle = _lock.newCondition(); + Condition _complete = _lock.newCondition(); + Throwable _state = IDLE; + public SharedBlockingCallback() + {} + + public void acquire() throws IOException { - this(new Blocker()); - } - - protected SharedBlockingCallback(Blocker blocker) - { - _blocker=blocker; - } - - public Blocker acquire() throws IOException - { - _blocker._lock.lock(); + _lock.lock(); try { - while (_blocker._state != IDLE) - _blocker._idle.await(); - _blocker._state = null; + while (_state!=IDLE) + _idle.await(); + _state=null; } catch (final InterruptedException e) { - throw new InterruptedIOException() - { - { - initCause(e); - } - }; + throw new InterruptedIOException(){{initCause(e);}}; } finally { - _blocker._lock.unlock(); + _lock.unlock(); } - return _blocker; } - - /* ------------------------------------------------------------ */ - /** A Closeable Callback. - * Uses the auto close mechanism to block until the collback is complete. - */ - public static class Blocker implements Callback, Closeable + @Override + public void succeeded() { - final ReentrantLock _lock = new ReentrantLock(); - final Condition _idle = _lock.newCondition(); - final Condition _complete = _lock.newCondition(); - Throwable _state = IDLE; - - public Blocker() + _lock.lock(); + try { + if (_state==null) + { + _state=SUCCEEDED; + _complete.signalAll(); + } + else if (_state==IDLE) + throw new IllegalStateException("IDLE"); } - - @Override - public void succeeded() + finally { - _lock.lock(); - try - { - if (_state == null) - { - _state = SUCCEEDED; - _complete.signalAll(); - } - else if (_state == IDLE) - throw new IllegalStateException("IDLE"); - } - finally - { - _lock.unlock(); - } - } - - @Override - public void failed(Throwable cause) - { - _lock.lock(); - try - { - if (_state == null) - { - _state = cause; - _complete.signalAll(); - } - else if (_state == IDLE) - throw new IllegalStateException("IDLE"); - } - finally - { - _lock.unlock(); - } - } - - /** - * Block until the Callback has succeeded or failed and after the return leave in the state to allow reuse. This is useful for code that wants to - * repeatable use a FutureCallback to convert an asynchronous API to a blocking API. - * - * @throws IOException - * if exception was caught during blocking, or callback was cancelled - */ - @Override - public void close() throws IOException - { - _lock.lock(); - try - { - while (_state == null) - _complete.await(); - - if (_state == SUCCEEDED) - return; - if (_state == IDLE) - throw new IllegalStateException("IDLE"); - if (_state instanceof IOException) - throw (IOException)_state; - if (_state instanceof CancellationException) - throw (CancellationException)_state; - if (_state instanceof RuntimeException) - throw (RuntimeException)_state; - if (_state instanceof Error) - throw (Error)_state; - throw new IOException(_state); - } - catch (final InterruptedException e) - { - throw new InterruptedIOException() - { - { - initCause(e); - } - }; - } - finally - { - _state = IDLE; - _idle.signalAll(); - _lock.unlock(); - } - } - - @Override - public String toString() - { - _lock.lock(); - try - { - return String.format("%s@%x{%s}",SharedBlockingCallback.class.getSimpleName(),hashCode(),_state); - } - finally - { - _lock.unlock(); - } + _lock.unlock(); } } + + @Override + public void failed(Throwable cause) + { + _lock.lock(); + try + { + if (_state==null) + { + _state=cause; + _complete.signalAll(); + } + else if (_state==IDLE) + throw new IllegalStateException("IDLE"); + } + finally + { + _lock.unlock(); + } + } + + /** Block until the Callback has succeeded or failed and + * after the return leave in the state to allow reuse. + * This is useful for code that wants to repeatable use a FutureCallback to convert + * an asynchronous API to a blocking API. + * @throws IOException if exception was caught during blocking, or callback was cancelled + */ + @Override + public void block() throws IOException + { + _lock.lock(); + try + { + while (_state==null) + _complete.await(); + + if (_state==SUCCEEDED) + return; + if (_state==IDLE) + throw new IllegalStateException("IDLE"); + if (_state instanceof IOException) + throw (IOException) _state; + if (_state instanceof CancellationException) + throw (CancellationException) _state; + throw new IOException(_state); + } + catch (final InterruptedException e) + { + throw new InterruptedIOException(){{initCause(e);}}; + } + finally + { + _state=IDLE; + _idle.signalAll(); + _lock.unlock(); + } + } + + + @Override + public String toString() + { + _lock.lock(); + try + { + return String.format("%s@%x{%s}",SharedBlockingCallback.class.getSimpleName(),hashCode(),_state); + } + finally + { + _lock.unlock(); + } + } + } diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java index 1d4894ad92e..43f9a693c63 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java @@ -19,17 +19,22 @@ package org.eclipse.jetty.util; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.eclipse.jetty.util.SharedBlockingCallback.Blocker; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; public class SharedBlockingCallbackTest { - final SharedBlockingCallback sbcb= new SharedBlockingCallback(); + final SharedBlockingCallback fcb= new SharedBlockingCallback(); public SharedBlockingCallbackTest() { @@ -38,38 +43,34 @@ public class SharedBlockingCallbackTest @Test public void testDone() throws Exception - { - long start; - try (Blocker blocker=sbcb.acquire()) - { - blocker.succeeded(); - start=System.currentTimeMillis(); - } + { + fcb.acquire(); + fcb.succeeded(); + long start=System.currentTimeMillis(); + fcb.block(); Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L)); } @Test public void testGetDone() throws Exception { - long start; - try (final Blocker blocker=sbcb.acquire()) + fcb.acquire(); + final CountDownLatch latch = new CountDownLatch(1); + + new Thread(new Runnable() { - final CountDownLatch latch = new CountDownLatch(1); - - new Thread(new Runnable() + @Override + public void run() { - @Override - public void run() - { - latch.countDown(); - try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();} - blocker.succeeded(); - } - }).start(); - - latch.await(); - start=System.currentTimeMillis(); - } + latch.countDown(); + try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();} + fcb.succeeded(); + } + }).start(); + + latch.await(); + long start=System.currentTimeMillis(); + fcb.block(); Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L)); Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(1000L)); } @@ -77,19 +78,18 @@ public class SharedBlockingCallbackTest @Test public void testFailed() throws Exception { - final Exception ex = new Exception("FAILED"); - long start=Long.MIN_VALUE; + fcb.acquire(); + Exception ex=new Exception("FAILED"); + fcb.failed(ex); + + long start=System.currentTimeMillis(); try { - try (final Blocker blocker=sbcb.acquire()) - { - blocker.failed(ex); - } + fcb.block(); Assert.fail(); } catch(IOException ee) { - start=System.currentTimeMillis(); Assert.assertEquals(ex,ee.getCause()); } Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(100L)); @@ -98,29 +98,26 @@ public class SharedBlockingCallbackTest @Test public void testGetFailed() throws Exception { - final Exception ex = new Exception("FAILED"); - long start=Long.MIN_VALUE; + fcb.acquire(); + final Exception ex=new Exception("FAILED"); final CountDownLatch latch = new CountDownLatch(1); - + + new Thread(new Runnable() + { + @Override + public void run() + { + latch.countDown(); + try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();} + fcb.failed(ex); + } + }).start(); + + latch.await(); + long start=System.currentTimeMillis(); try { - try (final Blocker blocker=sbcb.acquire()) - { - - new Thread(new Runnable() - { - @Override - public void run() - { - latch.countDown(); - try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();} - blocker.failed(ex); - } - }).start(); - - latch.await(); - start=System.currentTimeMillis(); - } + fcb.block(); Assert.fail(); } catch(IOException ee) @@ -144,12 +141,11 @@ public class SharedBlockingCallbackTest { try { - try (Blocker blocker=sbcb.acquire()) - { - latch.countDown(); - TimeUnit.MILLISECONDS.sleep(100); - blocker.succeeded(); - } + fcb.acquire(); + latch.countDown(); + TimeUnit.MILLISECONDS.sleep(100); + fcb.succeeded(); + fcb.block(); } catch(Exception e) { @@ -161,13 +157,12 @@ public class SharedBlockingCallbackTest latch.await(); long start=System.currentTimeMillis(); - try (Blocker blocker=sbcb.acquire()) - { - Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L)); - Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L)); + fcb.acquire(); + Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L)); + Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L)); - blocker.succeeded(); - }; + fcb.succeeded(); + fcb.block(); Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(600L)); } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/BlockingWriteCallback.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/BlockingWriteCallback.java index 4fdc68072fc..efda63020e9 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/BlockingWriteCallback.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/BlockingWriteCallback.java @@ -18,39 +18,23 @@ package org.eclipse.jetty.websocket.common; -import java.io.IOException; - import org.eclipse.jetty.util.SharedBlockingCallback; import org.eclipse.jetty.websocket.api.WriteCallback; - -/* ------------------------------------------------------------ */ -/** extend a SharedlBlockingCallback to an websocket WriteCallback - */ -public class BlockingWriteCallback extends SharedBlockingCallback +public class BlockingWriteCallback extends SharedBlockingCallback implements WriteCallback { public BlockingWriteCallback() - { - super(new WriteBlocker()); - } - - public WriteBlocker acquireWriteBlocker() throws IOException - { - return (WriteBlocker)acquire(); - } + {} - public static class WriteBlocker extends Blocker implements WriteCallback + @Override + public void writeFailed(Throwable x) { - @Override - public void writeFailed(Throwable x) - { - failed(x); - } + failed(x); + } - @Override - public void writeSuccess() - { - succeeded(); - } + @Override + public void writeSuccess() + { + succeeded(); } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java index e0193a8986f..146641bb4e0 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java @@ -32,7 +32,6 @@ import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; -import org.eclipse.jetty.websocket.common.BlockingWriteCallback.WriteBlocker; import org.eclipse.jetty.websocket.common.frames.BinaryFrame; import org.eclipse.jetty.websocket.common.frames.ContinuationFrame; import org.eclipse.jetty.websocket.common.frames.DataFrame; @@ -101,10 +100,9 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint private void blockingWrite(WebSocketFrame frame) throws IOException { - try(WriteBlocker b=blocker.acquireWriteBlocker()) - { - uncheckedSendFrame(frame, b); - } + blocker.acquire(); + uncheckedSendFrame(frame, blocker); + blocker.block(); } private boolean lockMsg(MsgType type) @@ -443,13 +441,14 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint this.batchMode = batchMode; } - @Override public void flush() throws IOException { lockMsg(MsgType.ASYNC); - try (WriteBlocker b = blocker.acquireWriteBlocker()) + try { - uncheckedSendFrame(FrameFlusher.FLUSH_FRAME, b); + blocker.acquire(); + uncheckedSendFrame(FrameFlusher.FLUSH_FRAME, blocker); + blocker.block(); } finally { diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java index b9249cfe396..e1bb6b14902 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java @@ -31,7 +31,6 @@ import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; import org.eclipse.jetty.websocket.common.BlockingWriteCallback; import org.eclipse.jetty.websocket.common.WebSocketSession; -import org.eclipse.jetty.websocket.common.BlockingWriteCallback.WriteBlocker; import org.eclipse.jetty.websocket.common.frames.BinaryFrame; /** @@ -143,10 +142,9 @@ public class MessageOutputStream extends OutputStream frame.setPayload(buffer); frame.setFin(fin); - try(WriteBlocker b=blocker.acquireWriteBlocker()) - { - outgoing.outgoingFrame(frame, b, BatchMode.OFF); - } + blocker.acquire(); + outgoing.outgoingFrame(frame, blocker, BatchMode.OFF); + blocker.block(); ++frameCount; // Any flush after the first will be a CONTINUATION frame. diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java index 655bf386cb6..8d691217d3d 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java @@ -31,7 +31,6 @@ import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; import org.eclipse.jetty.websocket.common.BlockingWriteCallback; import org.eclipse.jetty.websocket.common.WebSocketSession; -import org.eclipse.jetty.websocket.common.BlockingWriteCallback.WriteBlocker; import org.eclipse.jetty.websocket.common.frames.TextFrame; /** @@ -147,10 +146,9 @@ public class MessageWriter extends Writer frame.setPayload(data); frame.setFin(fin); - try (WriteBlocker b = blocker.acquireWriteBlocker()) - { - outgoing.outgoingFrame(frame, b, BatchMode.OFF); - } + blocker.acquire(); + outgoing.outgoingFrame(frame, blocker, BatchMode.OFF); + blocker.block(); ++frameCount; // Any flush after the first will be a CONTINUATION frame.