diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/CloseStatus.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/CloseStatus.java index 4c05283f3ac..375dac8ae54 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/CloseStatus.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/CloseStatus.java @@ -175,16 +175,11 @@ public class CloseStatus // TODO consider defining a precedence for every CloseStatus, and change SessionState only if higher precedence public static boolean isOrdinary(CloseStatus closeStatus) { - switch (closeStatus.getCode()) - { - case NORMAL: - case SHUTDOWN: - case NO_CODE: - return true; - - default: - return false; - } + int code = closeStatus.getCode(); + if (code == NORMAL || code == NO_CODE || code >= 3000) + return true; + else + return false; } public int getCode() diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java index e69d59f9b44..e32c9f2c890 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java @@ -44,6 +44,8 @@ import org.eclipse.jetty.websocket.core.OpCode; import org.eclipse.jetty.websocket.core.WebSocketException; import org.eclipse.jetty.websocket.core.WebSocketWriteTimeoutException; +import static org.eclipse.jetty.websocket.core.internal.WebSocketCoreSession.AbnormalCloseStatus; + public class FrameFlusher extends IteratingCallback { public static final Frame FLUSH_FRAME = new Frame(OpCode.BINARY); @@ -117,8 +119,7 @@ public class FrameFlusher extends IteratingCallback else { //fail all existing entries in the queue, and enqueue the close frame - failedEntries = new ArrayList<>(); - failedEntries.addAll(queue); + failedEntries = new ArrayList<>(queue); queue.clear(); queue.offerFirst(entry); } @@ -152,9 +153,17 @@ public class FrameFlusher extends IteratingCallback if (failedEntries != null) { + WebSocketException failure = new WebSocketException("Flusher received abnormal CloseFrame: " + CloseStatus.codeString(closeStatus.getCode())); + if (closeStatus instanceof AbnormalCloseStatus) + { + Throwable cause = ((AbnormalCloseStatus)closeStatus).getCause(); + if (cause != null) + failure.addSuppressed(cause); + } + for (Entry e : failedEntries) { - notifyCallbackFailure(e.callback, new WebSocketException(CloseStatus.codeString(closeStatus.getCode()))); + notifyCallbackFailure(e.callback, failure); } } diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/internal/FrameFlusherTest.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/internal/FrameFlusherTest.java index 2cddce089f3..dee3c6860b8 100644 --- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/internal/FrameFlusherTest.java +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/internal/FrameFlusherTest.java @@ -199,47 +199,31 @@ public class FrameFlusherTest int maxGather = 8; FrameFlusher frameFlusher = new FrameFlusher(bufferPool, scheduler, generator, endPoint, bufferSize, maxGather); - // enqueue message before the error close + // Enqueue message before the error close. Frame frame1 = new Frame(OpCode.TEXT).setPayload("message before close").setFin(true); - LatchCallback callback1 = new LatchCallback(); - assertTrue(frameFlusher.enqueue(frame1, callback1, false)); + CountDownLatch failedFrame1 = new CountDownLatch(1); + Callback callbackFrame1 = Callback.from(()->{}, t->failedFrame1.countDown()); + assertTrue(frameFlusher.enqueue(frame1, callbackFrame1, false)); - // enqueue the close frame which should fail the previous frame as it is still in the queue + // Enqueue the close frame which should fail the previous frame as it is still in the queue. Frame closeFrame = new CloseStatus(CloseStatus.MESSAGE_TOO_LARGE).toFrame(); - LatchCallback closeCallback = new LatchCallback(); - assertTrue(frameFlusher.enqueue(closeFrame, closeCallback, false)); - assertTrue(callback1.failure.await(1, TimeUnit.SECONDS)); + CountDownLatch succeededCloseFrame = new CountDownLatch(1); + Callback closeFrameCallback = Callback.from(succeededCloseFrame::countDown, t->{}); + assertTrue(frameFlusher.enqueue(closeFrame, closeFrameCallback, false)); + assertTrue(failedFrame1.await(1, TimeUnit.SECONDS)); - // any frames enqueued after this should fail + // Any frames enqueued after this should fail. Frame frame2 = new Frame(OpCode.TEXT).setPayload("message after close").setFin(true); - LatchCallback callback2 = new LatchCallback(); - assertFalse(frameFlusher.enqueue(frame2, callback2, false)); - assertTrue(callback2.failure.await(1, TimeUnit.SECONDS)); + CountDownLatch failedFrame2 = new CountDownLatch(1); + Callback callbackFrame2 = Callback.from(()->{}, t->failedFrame2.countDown()); + assertFalse(frameFlusher.enqueue(frame2, callbackFrame2, false)); + assertTrue(failedFrame2.await(1, TimeUnit.SECONDS)); - // iterating should succeed the close callback + // Iterating should succeed the close callback. frameFlusher.iterate(); - assertTrue(closeCallback.success.await(1, TimeUnit.SECONDS)); + assertTrue(succeededCloseFrame.await(1, TimeUnit.SECONDS)); } - public static class LatchCallback implements Callback - { - public CountDownLatch success = new CountDownLatch(1); - public CountDownLatch failure = new CountDownLatch(1); - - @Override - public void succeeded() - { - success.countDown(); - } - - @Override - public void failed(Throwable x) - { - failure.countDown(); - } - } - - public static class CapturingEndPoint extends MockEndpoint { public Parser parser;