Issue #3666 - changes from review
- code cleanups - made SHUTDOWN 1001 status an abnormal close status Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
parent
9e09f76fda
commit
e5a6910be4
|
@ -175,17 +175,12 @@ public class CloseStatus
|
||||||
// TODO consider defining a precedence for every CloseStatus, and change SessionState only if higher precedence
|
// TODO consider defining a precedence for every CloseStatus, and change SessionState only if higher precedence
|
||||||
public static boolean isOrdinary(CloseStatus closeStatus)
|
public static boolean isOrdinary(CloseStatus closeStatus)
|
||||||
{
|
{
|
||||||
switch (closeStatus.getCode())
|
int code = closeStatus.getCode();
|
||||||
{
|
if (code == NORMAL || code == NO_CODE || code >= 3000)
|
||||||
case NORMAL:
|
|
||||||
case SHUTDOWN:
|
|
||||||
case NO_CODE:
|
|
||||||
return true;
|
return true;
|
||||||
|
else
|
||||||
default:
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
public int getCode()
|
public int getCode()
|
||||||
{
|
{
|
||||||
|
|
|
@ -44,6 +44,8 @@ import org.eclipse.jetty.websocket.core.OpCode;
|
||||||
import org.eclipse.jetty.websocket.core.WebSocketException;
|
import org.eclipse.jetty.websocket.core.WebSocketException;
|
||||||
import org.eclipse.jetty.websocket.core.WebSocketWriteTimeoutException;
|
import org.eclipse.jetty.websocket.core.WebSocketWriteTimeoutException;
|
||||||
|
|
||||||
|
import static org.eclipse.jetty.websocket.core.internal.WebSocketCoreSession.AbnormalCloseStatus;
|
||||||
|
|
||||||
public class FrameFlusher extends IteratingCallback
|
public class FrameFlusher extends IteratingCallback
|
||||||
{
|
{
|
||||||
public static final Frame FLUSH_FRAME = new Frame(OpCode.BINARY);
|
public static final Frame FLUSH_FRAME = new Frame(OpCode.BINARY);
|
||||||
|
@ -117,8 +119,7 @@ public class FrameFlusher extends IteratingCallback
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
//fail all existing entries in the queue, and enqueue the close frame
|
//fail all existing entries in the queue, and enqueue the close frame
|
||||||
failedEntries = new ArrayList<>();
|
failedEntries = new ArrayList<>(queue);
|
||||||
failedEntries.addAll(queue);
|
|
||||||
queue.clear();
|
queue.clear();
|
||||||
queue.offerFirst(entry);
|
queue.offerFirst(entry);
|
||||||
}
|
}
|
||||||
|
@ -152,9 +153,17 @@ public class FrameFlusher extends IteratingCallback
|
||||||
|
|
||||||
if (failedEntries != null)
|
if (failedEntries != null)
|
||||||
{
|
{
|
||||||
|
WebSocketException failure = new WebSocketException("Flusher received abnormal CloseFrame: " + CloseStatus.codeString(closeStatus.getCode()));
|
||||||
|
if (closeStatus instanceof AbnormalCloseStatus)
|
||||||
|
{
|
||||||
|
Throwable cause = ((AbnormalCloseStatus)closeStatus).getCause();
|
||||||
|
if (cause != null)
|
||||||
|
failure.addSuppressed(cause);
|
||||||
|
}
|
||||||
|
|
||||||
for (Entry e : failedEntries)
|
for (Entry e : failedEntries)
|
||||||
{
|
{
|
||||||
notifyCallbackFailure(e.callback, new WebSocketException(CloseStatus.codeString(closeStatus.getCode())));
|
notifyCallbackFailure(e.callback, failure);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -199,47 +199,31 @@ public class FrameFlusherTest
|
||||||
int maxGather = 8;
|
int maxGather = 8;
|
||||||
FrameFlusher frameFlusher = new FrameFlusher(bufferPool, scheduler, generator, endPoint, bufferSize, maxGather);
|
FrameFlusher frameFlusher = new FrameFlusher(bufferPool, scheduler, generator, endPoint, bufferSize, maxGather);
|
||||||
|
|
||||||
// enqueue message before the error close
|
// Enqueue message before the error close.
|
||||||
Frame frame1 = new Frame(OpCode.TEXT).setPayload("message before close").setFin(true);
|
Frame frame1 = new Frame(OpCode.TEXT).setPayload("message before close").setFin(true);
|
||||||
LatchCallback callback1 = new LatchCallback();
|
CountDownLatch failedFrame1 = new CountDownLatch(1);
|
||||||
assertTrue(frameFlusher.enqueue(frame1, callback1, false));
|
Callback callbackFrame1 = Callback.from(()->{}, t->failedFrame1.countDown());
|
||||||
|
assertTrue(frameFlusher.enqueue(frame1, callbackFrame1, false));
|
||||||
|
|
||||||
// enqueue the close frame which should fail the previous frame as it is still in the queue
|
// Enqueue the close frame which should fail the previous frame as it is still in the queue.
|
||||||
Frame closeFrame = new CloseStatus(CloseStatus.MESSAGE_TOO_LARGE).toFrame();
|
Frame closeFrame = new CloseStatus(CloseStatus.MESSAGE_TOO_LARGE).toFrame();
|
||||||
LatchCallback closeCallback = new LatchCallback();
|
CountDownLatch succeededCloseFrame = new CountDownLatch(1);
|
||||||
assertTrue(frameFlusher.enqueue(closeFrame, closeCallback, false));
|
Callback closeFrameCallback = Callback.from(succeededCloseFrame::countDown, t->{});
|
||||||
assertTrue(callback1.failure.await(1, TimeUnit.SECONDS));
|
assertTrue(frameFlusher.enqueue(closeFrame, closeFrameCallback, false));
|
||||||
|
assertTrue(failedFrame1.await(1, TimeUnit.SECONDS));
|
||||||
|
|
||||||
// any frames enqueued after this should fail
|
// Any frames enqueued after this should fail.
|
||||||
Frame frame2 = new Frame(OpCode.TEXT).setPayload("message after close").setFin(true);
|
Frame frame2 = new Frame(OpCode.TEXT).setPayload("message after close").setFin(true);
|
||||||
LatchCallback callback2 = new LatchCallback();
|
CountDownLatch failedFrame2 = new CountDownLatch(1);
|
||||||
assertFalse(frameFlusher.enqueue(frame2, callback2, false));
|
Callback callbackFrame2 = Callback.from(()->{}, t->failedFrame2.countDown());
|
||||||
assertTrue(callback2.failure.await(1, TimeUnit.SECONDS));
|
assertFalse(frameFlusher.enqueue(frame2, callbackFrame2, false));
|
||||||
|
assertTrue(failedFrame2.await(1, TimeUnit.SECONDS));
|
||||||
|
|
||||||
// iterating should succeed the close callback
|
// Iterating should succeed the close callback.
|
||||||
frameFlusher.iterate();
|
frameFlusher.iterate();
|
||||||
assertTrue(closeCallback.success.await(1, TimeUnit.SECONDS));
|
assertTrue(succeededCloseFrame.await(1, TimeUnit.SECONDS));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class LatchCallback implements Callback
|
|
||||||
{
|
|
||||||
public CountDownLatch success = new CountDownLatch(1);
|
|
||||||
public CountDownLatch failure = new CountDownLatch(1);
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void succeeded()
|
|
||||||
{
|
|
||||||
success.countDown();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void failed(Throwable x)
|
|
||||||
{
|
|
||||||
failure.countDown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public static class CapturingEndPoint extends MockEndpoint
|
public static class CapturingEndPoint extends MockEndpoint
|
||||||
{
|
{
|
||||||
public Parser parser;
|
public Parser parser;
|
||||||
|
|
Loading…
Reference in New Issue