Issue #3666 - error CloseFrames skip frames in the FrameFlusher queue
Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
parent
49356bb5c5
commit
9e09f76fda
|
@ -291,8 +291,8 @@ public class CloseStatus
|
|||
public Frame toFrame()
|
||||
{
|
||||
if (isTransmittableStatusCode(code))
|
||||
return new CloseFrame(this, OpCode.CLOSE, true, asPayloadBuffer(code, reason));
|
||||
return new CloseFrame(this, OpCode.CLOSE);
|
||||
return new CloseFrame(OpCode.CLOSE, true, asPayloadBuffer(code, reason));
|
||||
return new CloseFrame(OpCode.CLOSE);
|
||||
}
|
||||
|
||||
public static Frame toFrame(int closeStatus)
|
||||
|
@ -356,12 +356,12 @@ public class CloseStatus
|
|||
|
||||
class CloseFrame extends Frame implements CloseStatus.Supplier
|
||||
{
|
||||
public CloseFrame(CloseStatus closeStatus, byte opcode)
|
||||
public CloseFrame(byte opcode)
|
||||
{
|
||||
super(opcode);
|
||||
}
|
||||
|
||||
public CloseFrame(CloseStatus closeStatus, byte opCode, boolean fin, ByteBuffer payload)
|
||||
public CloseFrame(byte opCode, boolean fin, ByteBuffer payload)
|
||||
{
|
||||
super(opCode, fin, payload);
|
||||
}
|
||||
|
|
|
@ -38,8 +38,10 @@ import org.eclipse.jetty.util.TypeUtil;
|
|||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.thread.Scheduler;
|
||||
import org.eclipse.jetty.websocket.core.CloseStatus;
|
||||
import org.eclipse.jetty.websocket.core.Frame;
|
||||
import org.eclipse.jetty.websocket.core.OpCode;
|
||||
import org.eclipse.jetty.websocket.core.WebSocketException;
|
||||
import org.eclipse.jetty.websocket.core.WebSocketWriteTimeoutException;
|
||||
|
||||
public class FrameFlusher extends IteratingCallback
|
||||
|
@ -96,6 +98,8 @@ public class FrameFlusher extends IteratingCallback
|
|||
byte opCode = frame.getOpCode();
|
||||
|
||||
Throwable dead;
|
||||
List<Entry> failedEntries = null;
|
||||
CloseStatus closeStatus = null;
|
||||
|
||||
synchronized (this)
|
||||
{
|
||||
|
@ -104,10 +108,33 @@ public class FrameFlusher extends IteratingCallback
|
|||
dead = closedCause;
|
||||
if (dead == null)
|
||||
{
|
||||
if (opCode == OpCode.PING || opCode == OpCode.PONG)
|
||||
queue.offerFirst(entry);
|
||||
else
|
||||
queue.offerLast(entry);
|
||||
switch (opCode)
|
||||
{
|
||||
case OpCode.CLOSE:
|
||||
closeStatus = CloseStatus.getCloseStatus(frame);
|
||||
if (CloseStatus.isOrdinary(closeStatus))
|
||||
queue.offerLast(entry);
|
||||
else
|
||||
{
|
||||
//fail all existing entries in the queue, and enqueue the close frame
|
||||
failedEntries = new ArrayList<>();
|
||||
failedEntries.addAll(queue);
|
||||
queue.clear();
|
||||
queue.offerFirst(entry);
|
||||
}
|
||||
|
||||
this.canEnqueue = false;
|
||||
break;
|
||||
|
||||
case OpCode.PING:
|
||||
case OpCode.PONG:
|
||||
queue.offerFirst(entry);
|
||||
break;
|
||||
|
||||
default:
|
||||
queue.offerLast(entry);
|
||||
break;
|
||||
}
|
||||
|
||||
/* If the queue was empty then no timeout has been set, so we set a timeout to check the current
|
||||
entry when it expires. When the timeout expires we will go over entries in the queue and
|
||||
|
@ -115,9 +142,6 @@ public class FrameFlusher extends IteratingCallback
|
|||
with the soonest expiry time. */
|
||||
if ((idleTimeout > 0) && (queue.size()==1) && entries.isEmpty())
|
||||
timeoutScheduler.schedule(this::timeoutExpired, idleTimeout, TimeUnit.MILLISECONDS);
|
||||
|
||||
if (opCode == OpCode.CLOSE)
|
||||
this.canEnqueue = false;
|
||||
}
|
||||
}
|
||||
else
|
||||
|
@ -126,6 +150,14 @@ public class FrameFlusher extends IteratingCallback
|
|||
}
|
||||
}
|
||||
|
||||
if (failedEntries != null)
|
||||
{
|
||||
for (Entry e : failedEntries)
|
||||
{
|
||||
notifyCallbackFailure(e.callback, new WebSocketException(CloseStatus.codeString(closeStatus.getCode())));
|
||||
}
|
||||
}
|
||||
|
||||
if (dead == null)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
|
|
|
@ -189,6 +189,57 @@ public class FrameFlusherTest
|
|||
assertThat(error.get(), instanceOf(WebSocketWriteTimeoutException.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testErrorClose() throws Exception
|
||||
{
|
||||
Generator generator = new Generator(bufferPool);
|
||||
BlockingEndpoint endPoint = new BlockingEndpoint(bufferPool);
|
||||
endPoint.setBlockTime(100);
|
||||
int bufferSize = WebSocketConstants.DEFAULT_MAX_TEXT_MESSAGE_SIZE;
|
||||
int maxGather = 8;
|
||||
FrameFlusher frameFlusher = new FrameFlusher(bufferPool, scheduler, generator, endPoint, bufferSize, maxGather);
|
||||
|
||||
// enqueue message before the error close
|
||||
Frame frame1 = new Frame(OpCode.TEXT).setPayload("message before close").setFin(true);
|
||||
LatchCallback callback1 = new LatchCallback();
|
||||
assertTrue(frameFlusher.enqueue(frame1, callback1, false));
|
||||
|
||||
// enqueue the close frame which should fail the previous frame as it is still in the queue
|
||||
Frame closeFrame = new CloseStatus(CloseStatus.MESSAGE_TOO_LARGE).toFrame();
|
||||
LatchCallback closeCallback = new LatchCallback();
|
||||
assertTrue(frameFlusher.enqueue(closeFrame, closeCallback, false));
|
||||
assertTrue(callback1.failure.await(1, TimeUnit.SECONDS));
|
||||
|
||||
// any frames enqueued after this should fail
|
||||
Frame frame2 = new Frame(OpCode.TEXT).setPayload("message after close").setFin(true);
|
||||
LatchCallback callback2 = new LatchCallback();
|
||||
assertFalse(frameFlusher.enqueue(frame2, callback2, false));
|
||||
assertTrue(callback2.failure.await(1, TimeUnit.SECONDS));
|
||||
|
||||
// iterating should succeed the close callback
|
||||
frameFlusher.iterate();
|
||||
assertTrue(closeCallback.success.await(1, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
public static class LatchCallback implements Callback
|
||||
{
|
||||
public CountDownLatch success = new CountDownLatch(1);
|
||||
public CountDownLatch failure = new CountDownLatch(1);
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
success.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
failure.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static class CapturingEndPoint extends MockEndpoint
|
||||
{
|
||||
public Parser parser;
|
||||
|
|
Loading…
Reference in New Issue