diff --git a/jetty-websocket/websocket-javax-common/src/main/java/org/eclipse/jetty/websocket/javax/common/messages/MessageOutputStream.java b/jetty-websocket/websocket-javax-common/src/main/java/org/eclipse/jetty/websocket/javax/common/messages/MessageOutputStream.java index 21fa8b91471..bbf4a4688f8 100644 --- a/jetty-websocket/websocket-javax-common/src/main/java/org/eclipse/jetty/websocket/javax/common/messages/MessageOutputStream.java +++ b/jetty-websocket/websocket-javax-common/src/main/java/org/eclipse/jetty/websocket/javax/common/messages/MessageOutputStream.java @@ -25,7 +25,7 @@ import java.nio.ByteBuffer; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.SharedBlockingCallback; +import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.core.CoreSession; @@ -41,7 +41,6 @@ public class MessageOutputStream extends OutputStream private final CoreSession coreSession; private final ByteBufferPool bufferPool; - private final SharedBlockingCallback blocker; private long frameCount; private long bytesSent; private Frame frame; @@ -53,7 +52,6 @@ public class MessageOutputStream extends OutputStream { this.coreSession = coreSession; this.bufferPool = bufferPool; - this.blocker = new SharedBlockingCallback(); this.buffer = bufferPool.acquire(bufferSize, true); BufferUtil.flipToFill(buffer); this.frame = new Frame(OpCode.BINARY); @@ -118,11 +116,9 @@ public class MessageOutputStream extends OutputStream frame.setFin(fin); int initialBufferSize = buffer.remaining(); - try (SharedBlockingCallback.Blocker b = blocker.acquire()) - { - coreSession.sendFrame(frame, b, false); - b.block(); - } + FutureCallback b = new FutureCallback(); + coreSession.sendFrame(frame, b, false); + b.block(); ++frameCount; // Any flush after the first will be a CONTINUATION frame. diff --git a/jetty-websocket/websocket-javax-common/src/main/java/org/eclipse/jetty/websocket/javax/common/messages/MessageWriter.java b/jetty-websocket/websocket-javax-common/src/main/java/org/eclipse/jetty/websocket/javax/common/messages/MessageWriter.java index 989141599b6..a69f68ba6a9 100644 --- a/jetty-websocket/websocket-javax-common/src/main/java/org/eclipse/jetty/websocket/javax/common/messages/MessageWriter.java +++ b/jetty-websocket/websocket-javax-common/src/main/java/org/eclipse/jetty/websocket/javax/common/messages/MessageWriter.java @@ -27,7 +27,7 @@ import java.nio.charset.CodingErrorAction; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.SharedBlockingCallback; +import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.core.CoreSession; @@ -50,7 +50,6 @@ public class MessageWriter extends Writer .onMalformedInput(CodingErrorAction.REPORT); private final CoreSession coreSession; - private final SharedBlockingCallback blocker; private long frameCount; private Frame frame; private CharBuffer buffer; @@ -60,7 +59,6 @@ public class MessageWriter extends Writer public MessageWriter(CoreSession coreSession, int bufferSize) { this.coreSession = coreSession; - this.blocker = new SharedBlockingCallback(); this.buffer = CharBuffer.allocate(bufferSize); this.frame = new Frame(OpCode.TEXT); } @@ -128,11 +126,9 @@ public class MessageWriter extends Writer frame.setPayload(payload); frame.setFin(fin); - try (SharedBlockingCallback.Blocker b = blocker.acquire()) - { - coreSession.sendFrame(frame, b, false); - b.block(); - } + FutureCallback b = new FutureCallback(); + coreSession.sendFrame(frame, b, false); + b.block(); ++frameCount; // Any flush after the first will be a CONTINUATION frame. diff --git a/jetty-websocket/websocket-javax-tests/src/main/java/org/eclipse/jetty/websocket/javax/tests/NetworkFuzzer.java b/jetty-websocket/websocket-javax-tests/src/main/java/org/eclipse/jetty/websocket/javax/tests/NetworkFuzzer.java index 0c219d2dc6d..a82972a61a7 100644 --- a/jetty-websocket/websocket-javax-tests/src/main/java/org/eclipse/jetty/websocket/javax/tests/NetworkFuzzer.java +++ b/jetty-websocket/websocket-javax-tests/src/main/java/org/eclipse/jetty/websocket/javax/tests/NetworkFuzzer.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.websocket.javax.tests; import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; @@ -33,7 +34,7 @@ import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.SharedBlockingCallback; +import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.websocket.core.Behavior; import org.eclipse.jetty.websocket.core.CloseStatus; import org.eclipse.jetty.websocket.core.CoreSession; @@ -52,7 +53,6 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab private final RawUpgradeRequest upgradeRequest; private final UnitGenerator generator; private final FrameCapture frameCapture; - private SharedBlockingCallback sharedBlockingCallback = new SharedBlockingCallback(); public NetworkFuzzer(LocalServer server) throws Exception { @@ -155,23 +155,16 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab { for (Frame f : frames) { - try (SharedBlockingCallback.Blocker blocker = sharedBlockingCallback.acquire()) - { - frameCapture.coreSession.sendFrame(f, blocker, false); - } + FutureCallback callback = new FutureCallback(); + frameCapture.coreSession.sendFrame(f, callback, false); + callback.block(); } } @Override public void sendFrames(Frame... frames) throws IOException { - for (Frame f : frames) - { - try (SharedBlockingCallback.Blocker blocker = sharedBlockingCallback.acquire()) - { - frameCapture.coreSession.sendFrame(f, blocker, false); - } - } + sendFrames(Arrays.asList(frames)); } @Override @@ -227,7 +220,6 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab private final BlockingQueue receivedFrames = new LinkedBlockingQueue<>(); private EndPoint endPoint; private CountDownLatch openLatch = new CountDownLatch(1); - private final SharedBlockingCallback blockingCallback = new SharedBlockingCallback(); private CoreSession coreSession; public void setEndPoint(EndPoint endpoint) @@ -275,10 +267,9 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab synchronized (this) { - try (SharedBlockingCallback.Blocker blocker = blockingCallback.acquire()) - { - endPoint.write(blocker, buffer); - } + FutureCallback callback = new FutureCallback(); + endPoint.write(callback, buffer); + callback.block(); } } } diff --git a/jetty-websocket/websocket-javax-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/server/IdleTimeoutTest.java b/jetty-websocket/websocket-javax-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/server/IdleTimeoutTest.java index bc1b2cbc143..9dc378e99ea 100644 --- a/jetty-websocket/websocket-javax-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/server/IdleTimeoutTest.java +++ b/jetty-websocket/websocket-javax-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/server/IdleTimeoutTest.java @@ -18,6 +18,8 @@ package org.eclipse.jetty.websocket.javax.tests.server; +import java.io.IOException; +import java.nio.channels.ClosedChannelException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; @@ -38,7 +40,9 @@ import org.junit.jupiter.api.Test; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertThrows; public class IdleTimeoutTest { @@ -76,7 +80,9 @@ public class IdleTimeoutTest // wait 1 second to allow timeout to fire off TimeUnit.SECONDS.sleep(1); - session.sendFrames(new Frame(OpCode.TEXT).setPayload("You shouldn't be there")); + IOException error = assertThrows(IOException.class, + () -> session.sendFrames(new Frame(OpCode.TEXT).setPayload("You shouldn't be there"))); + assertThat(error.getCause(), instanceOf(ClosedChannelException.class)); BlockingQueue framesQueue = session.getOutputFrames(); Frame frame = framesQueue.poll(1, TimeUnit.SECONDS); diff --git a/jetty-websocket/websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java b/jetty-websocket/websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java index 065ebe1ff7f..e86619f52c4 100644 --- a/jetty-websocket/websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java +++ b/jetty-websocket/websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java @@ -25,7 +25,7 @@ import java.nio.ByteBuffer; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.SharedBlockingCallback; +import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.core.CoreSession; @@ -41,7 +41,6 @@ public class MessageOutputStream extends OutputStream private final CoreSession coreSession; private final ByteBufferPool bufferPool; - private final SharedBlockingCallback blocker; private long frameCount; private long bytesSent; private Frame frame; @@ -53,7 +52,6 @@ public class MessageOutputStream extends OutputStream { this.coreSession = coreSession; this.bufferPool = bufferPool; - this.blocker = new SharedBlockingCallback(); this.buffer = bufferPool.acquire(bufferSize, true); BufferUtil.flipToFill(buffer); this.frame = new Frame(OpCode.BINARY); @@ -116,8 +114,10 @@ public class MessageOutputStream extends OutputStream BufferUtil.flipToFlush(buffer, 0); frame.setPayload(buffer); frame.setFin(fin); - try (SharedBlockingCallback.Blocker b = blocker.acquire()) + + try { + FutureCallback b = new FutureCallback(); coreSession.sendFrame(frame, b, false); b.block(); assert buffer.remaining() == 0; diff --git a/jetty-websocket/websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java b/jetty-websocket/websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java index 7c1199e16e9..4bd29905ca6 100644 --- a/jetty-websocket/websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java +++ b/jetty-websocket/websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java @@ -27,7 +27,7 @@ import java.nio.charset.CodingErrorAction; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.SharedBlockingCallback; +import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.core.CoreSession; @@ -50,7 +50,6 @@ public class MessageWriter extends Writer .onMalformedInput(CodingErrorAction.REPORT); private final CoreSession coreSession; - private final SharedBlockingCallback blocker; private long frameCount; private Frame frame; private CharBuffer buffer; @@ -60,7 +59,6 @@ public class MessageWriter extends Writer public MessageWriter(CoreSession coreSession, int bufferSize) { this.coreSession = coreSession; - this.blocker = new SharedBlockingCallback(); this.buffer = CharBuffer.allocate(bufferSize); this.frame = new Frame(OpCode.TEXT); } @@ -128,11 +126,9 @@ public class MessageWriter extends Writer frame.setPayload(payload); frame.setFin(fin); - try (SharedBlockingCallback.Blocker b = blocker.acquire()) - { - coreSession.sendFrame(frame, b, false); - b.block(); - } + FutureCallback b = new FutureCallback(); + coreSession.sendFrame(frame, b, false); + b.block(); ++frameCount; // Any flush after the first will be a CONTINUATION frame.