Issue #4747 - Throwing in onClose now calls onError

- This applies to the javax onClose method not the core onClosed method.
- Also fixes a bug where onClosed may never be called if onFrame throws.

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2020-04-06 20:28:57 +10:00
parent 2028b99e83
commit 54ee3f3939
8 changed files with 226 additions and 240 deletions

View File

@ -55,7 +55,7 @@ public class CloseStatus
private final int code;
private final String reason;
private final Throwable cause;
private Throwable cause;
/**
* Creates a reason for closing a web socket connection with the no given status code.
@ -211,6 +211,11 @@ public class CloseStatus
return !isOrdinary(code);
}
public void initCause(Throwable cause)
{
this.cause = cause;
}
public Throwable getCause()
{
return cause;

View File

@ -679,6 +679,7 @@ public class WebSocketCoreSession implements IncomingFrames, CoreSession, Dumpab
@Override
public void onFrame(Frame frame, final Callback callback)
{
Callback closeCallback = null;
try
{
if (LOG.isDebugEnabled())
@ -695,11 +696,13 @@ public class WebSocketCoreSession implements IncomingFrames, CoreSession, Dumpab
// Handle inbound CLOSE
connection.cancelDemand();
Callback closeCallback;
if (closeConnection)
{
closeCallback = Callback.from(() -> closeConnection(sessionState.getCloseStatus(), callback));
closeCallback = Callback.from(() -> closeConnection(sessionState.getCloseStatus(), callback), t ->
{
sessionState.onError(t);
closeConnection(sessionState.getCloseStatus(), callback);
});
}
else
{
@ -725,7 +728,10 @@ public class WebSocketCoreSession implements IncomingFrames, CoreSession, Dumpab
}
catch (Throwable t)
{
callback.failed(t);
if (closeCallback != null)
closeCallback.failed(t);
else
callback.failed(t);
}
}
}

View File

@ -123,6 +123,28 @@ public class WebSocketSessionState
}
}
/**
* This should only be called directly before closing the connection when we are in {@link State#CLOSED} state.
* This ensures an abnormal close status and if we have no error in the CloseStatus we will set one.
* @param t the error which occurred.
*/
public void onError(Throwable t)
{
synchronized (this)
{
if (_sessionState != State.CLOSED || _closeStatus == null)
throw new IllegalArgumentException();
// Override any normal close status.
if (!_closeStatus.isAbnormal())
_closeStatus = new CloseStatus(CloseStatus.SERVER_ERROR, t);
// Otherwise set the error if it wasn't already set to notify onError as well as onClose.
if (_closeStatus.getCause() == null)
_closeStatus.initCause(t);
}
}
public boolean onEof()
{
synchronized (this)

View File

@ -28,20 +28,10 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.logging.StacklessLogging;
import org.eclipse.jetty.server.NetworkConnector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.websocket.core.internal.WebSocketCoreSession;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
import org.eclipse.jetty.websocket.core.server.WebSocketUpgradeHandler;
import org.eclipse.jetty.websocket.core.server.internal.RFC6455Handshaker;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@ -49,6 +39,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.eclipse.jetty.util.Callback.NOOP;
import static org.eclipse.jetty.websocket.core.OpCode.CLOSE;
import static org.eclipse.jetty.websocket.core.OpCode.TEXT;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
@ -71,6 +63,7 @@ public class WebSocketCloseTest extends WebSocketTester
private static final String WSS_SCHEME = "wss";
private WebSocketServer server;
private DemandingTestFrameHandler serverHandler;
private Socket client;
enum State
@ -102,17 +95,16 @@ public class WebSocketCloseTest extends WebSocketTester
throw new IllegalStateException();
}
DemandingTestFrameHandler serverHandler = new DemandingTestFrameHandler();
server = new WebSocketServer(0, serverHandler, tls);
serverHandler = new DemandingTestFrameHandler();
server = new WebSocketServer(serverHandler, tls);
server.start();
client = newClient(server.getLocalPort(), tls);
assertTrue(server.handler.opened.await(5, TimeUnit.SECONDS));
assertThat(server.handler.state, containsString("CONNECTED"));
while (true)
assertTrue(serverHandler.opened.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.state, containsString("CONNECTED"));
while (!serverHandler.coreSession.toString().contains("OPEN"))
{
Thread.yield();
if (server.handler.getCoreSession().toString().contains("OPEN"))
break;
}
switch (state)
@ -125,19 +117,19 @@ public class WebSocketCloseTest extends WebSocketTester
case ISHUT:
{
client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true));
server.handler.getCoreSession().demand(1);
serverHandler.coreSession.demand(1);
Frame frame = serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS);
assertThat(new CloseStatus(frame.getPayload()).getCode(), is(CloseStatus.NORMAL));
assertThat(server.handler.getCoreSession().toString(), containsString("ISHUT"));
assertThat(serverHandler.coreSession.toString(), containsString("ISHUT"));
LOG.info("Server: ISHUT");
break;
}
case OSHUT:
{
server.sendFrame(CloseStatus.toFrame(CloseStatus.NORMAL));
serverHandler.coreSession.sendFrame(CloseStatus.toFrame(CloseStatus.NORMAL), NOOP, false);
CloseStatus closeStatus = new CloseStatus(receiveFrame(client.getInputStream()));
assertThat(closeStatus.getCode(), is(CloseStatus.NORMAL));
assertThat(server.handler.getCoreSession().toString(), containsString("OSHUT"));
assertThat(serverHandler.coreSession.toString(), containsString("OSHUT"));
LOG.info("Server: OSHUT");
break;
}
@ -150,12 +142,12 @@ public class WebSocketCloseTest extends WebSocketTester
{
setup(State.ISHUT, scheme);
server.handler.receivedCallback.poll().succeeded();
serverHandler.receivedCallback.poll().succeeded();
Frame frame = receiveFrame(client.getInputStream());
assertThat(new CloseStatus(frame.getPayload()).getCode(), is(CloseStatus.NORMAL));
assertTrue(server.handler.closed.await(10, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.NORMAL));
assertTrue(serverHandler.closed.await(10, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.NORMAL));
}
@ParameterizedTest
@ -164,13 +156,13 @@ public class WebSocketCloseTest extends WebSocketTester
{
setup(State.ISHUT, scheme);
server.sendFrame(CloseStatus.toFrame(CloseStatus.SHUTDOWN));
server.handler.receivedCallback.poll().succeeded();
serverHandler.coreSession.sendFrame(CloseStatus.toFrame(CloseStatus.SHUTDOWN), NOOP, false);
serverHandler.receivedCallback.poll().succeeded();
Frame frame = receiveFrame(client.getInputStream());
assertThat(new CloseStatus(frame.getPayload()).getCode(), is(CloseStatus.SHUTDOWN));
assertTrue(server.handler.closed.await(10, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.SHUTDOWN));
assertTrue(serverHandler.closed.await(10, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.SHUTDOWN));
}
@ParameterizedTest
@ -178,14 +170,14 @@ public class WebSocketCloseTest extends WebSocketTester
public void testServerFailCloseISHUT(String scheme) throws Exception
{
setup(State.ISHUT, scheme);
server.handler.receivedCallback.poll().failed(new Exception("test failure"));
serverHandler.receivedCallback.poll().failed(new Exception("test failure"));
CloseStatus closeStatus = new CloseStatus(receiveFrame(client.getInputStream()));
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(closeStatus.getReason(), is("test failure"));
assertTrue(server.handler.closed.await(10, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertTrue(serverHandler.closed.await(10, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
}
@ParameterizedTest
@ -195,8 +187,8 @@ public class WebSocketCloseTest extends WebSocketTester
setup(State.ISHUT, scheme);
client.shutdownOutput();
assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS));
server.handler.receivedCallback.poll().succeeded();
assertFalse(serverHandler.closed.await(250, TimeUnit.MILLISECONDS));
serverHandler.receivedCallback.poll().succeeded();
CloseStatus closeStatus = new CloseStatus(receiveFrame(client.getInputStream()));
assertThat(closeStatus.getCode(), is(CloseStatus.NORMAL));
@ -207,13 +199,13 @@ public class WebSocketCloseTest extends WebSocketTester
public void testClientCloseOSHUT(String scheme) throws Exception
{
setup(State.OSHUT, scheme);
server.handler.getCoreSession().demand(1);
serverHandler.coreSession.demand(1);
client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true));
assertNotNull(server.handler.receivedFrames.poll(10, TimeUnit.SECONDS));
server.handler.receivedCallback.poll().succeeded();
assertNotNull(serverHandler.receivedFrames.poll(10, TimeUnit.SECONDS));
serverHandler.receivedCallback.poll().succeeded();
assertTrue(server.handler.closed.await(10, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.NORMAL));
assertTrue(serverHandler.closed.await(10, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.NORMAL));
assertNull(receiveFrame(client.getInputStream()));
}
@ -223,13 +215,13 @@ public class WebSocketCloseTest extends WebSocketTester
public void testClientDifferentCloseOSHUT(String scheme) throws Exception
{
setup(State.OSHUT, scheme);
server.handler.getCoreSession().demand(1);
serverHandler.coreSession.demand(1);
client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.BAD_PAYLOAD), true));
assertNotNull(server.handler.receivedFrames.poll(10, TimeUnit.SECONDS));
server.handler.receivedCallback.poll().succeeded();
assertNotNull(serverHandler.receivedFrames.poll(10, TimeUnit.SECONDS));
serverHandler.receivedCallback.poll().succeeded();
assertTrue(server.handler.closed.await(10, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.BAD_PAYLOAD));
assertTrue(serverHandler.closed.await(10, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.BAD_PAYLOAD));
assertNull(receiveFrame(client.getInputStream()));
}
@ -241,13 +233,13 @@ public class WebSocketCloseTest extends WebSocketTester
try (StacklessLogging ignored = new StacklessLogging(WebSocketCoreSession.class))
{
setup(State.OSHUT, scheme);
server.handler.getCoreSession().demand(1);
serverHandler.coreSession.demand(1);
client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true));
assertNotNull(server.handler.receivedFrames.poll(10, TimeUnit.SECONDS));
server.handler.receivedCallback.poll().failed(new Exception("Test"));
assertNotNull(serverHandler.receivedFrames.poll(10, TimeUnit.SECONDS));
serverHandler.receivedCallback.poll().failed(new Exception("Test"));
assertTrue(server.handler.closed.await(10, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.NORMAL));
assertTrue(serverHandler.closed.await(10, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertNull(receiveFrame(client.getInputStream()));
}
@ -260,10 +252,10 @@ public class WebSocketCloseTest extends WebSocketTester
setup(State.OPEN, scheme);
client.getOutputStream().write(RawFrameBuilder.buildFrame(OpCode.PONG, "pong frame not masked", false));
server.handler.getCoreSession().demand(1);
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.PROTOCOL));
assertThat(server.handler.closeStatus.getReason(), containsString("Client MUST mask all frames"));
serverHandler.coreSession.demand(1);
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.PROTOCOL));
assertThat(serverHandler.closeStatus.getReason(), containsString("Client MUST mask all frames"));
}
@ParameterizedTest
@ -273,10 +265,10 @@ public class WebSocketCloseTest extends WebSocketTester
setup(State.OSHUT, scheme);
client.getOutputStream().write(RawFrameBuilder.buildFrame(OpCode.PONG, "pong frame not masked", false));
server.handler.getCoreSession().demand(1);
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.PROTOCOL));
assertThat(server.handler.closeStatus.getReason(), containsString("Client MUST mask all frames"));
serverHandler.coreSession.demand(1);
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.PROTOCOL));
assertThat(serverHandler.closeStatus.getReason(), containsString("Client MUST mask all frames"));
}
@ParameterizedTest
@ -286,8 +278,8 @@ public class WebSocketCloseTest extends WebSocketTester
setup(State.ISHUT, scheme);
client.getOutputStream().write(RawFrameBuilder.buildFrame(OpCode.PONG, "pong frame not masked", false));
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.PROTOCOL));
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.PROTOCOL));
Frame frame = receiveFrame(client.getInputStream());
assertThat(CloseStatus.getCloseStatus(frame).getCode(), is(CloseStatus.PROTOCOL));
@ -301,12 +293,12 @@ public class WebSocketCloseTest extends WebSocketTester
setup(State.ISHUT, scheme);
client.shutdownOutput();
assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS));
Callback callback = server.handler.receivedCallback.poll(5, TimeUnit.SECONDS);
assertFalse(serverHandler.closed.await(250, TimeUnit.MILLISECONDS));
Callback callback = serverHandler.receivedCallback.poll(5, TimeUnit.SECONDS);
callback.succeeded();
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.NORMAL));
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.NORMAL));
Frame frame = receiveFrame(client.getInputStream());
assertThat(CloseStatus.getCloseStatus(frame).getCode(), is(CloseStatus.NORMAL));
@ -320,24 +312,24 @@ public class WebSocketCloseTest extends WebSocketTester
setup(State.ISHUT, scheme);
client.close();
assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS));
assertFalse(serverHandler.closed.await(250, TimeUnit.MILLISECONDS));
assertTimeoutPreemptively(Duration.ofSeconds(1), () ->
{
while (true)
{
if (!server.isOpen())
if (!serverHandler.coreSession.isOutputOpen())
break;
server.sendFrame(new Frame(OpCode.TEXT, BufferUtil.toBuffer("frame after close")), Callback.NOOP);
serverHandler.coreSession.sendFrame(new Frame(TEXT, BufferUtil.toBuffer("frame after close")), Callback.NOOP, false);
Thread.sleep(100);
}
});
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertNotNull(server.handler.error);
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.NO_CLOSE));
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
assertNotNull(serverHandler.error);
assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.NO_CLOSE));
Callback callback = server.handler.receivedCallback.poll(5, TimeUnit.SECONDS);
Callback callback = serverHandler.receivedCallback.poll(5, TimeUnit.SECONDS);
callback.succeeded();
}
@ -348,10 +340,10 @@ public class WebSocketCloseTest extends WebSocketTester
setup(State.OPEN, scheme);
client.close();
assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS));
server.handler.getCoreSession().demand(1);
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.NO_CLOSE));
assertFalse(serverHandler.closed.await(250, TimeUnit.MILLISECONDS));
serverHandler.coreSession.demand(1);
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.NO_CLOSE));
}
@ParameterizedTest
@ -361,10 +353,10 @@ public class WebSocketCloseTest extends WebSocketTester
setup(State.OSHUT, scheme);
client.close();
assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS));
server.handler.getCoreSession().demand(1);
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.NO_CLOSE));
assertFalse(serverHandler.closed.await(250, TimeUnit.MILLISECONDS));
serverHandler.coreSession.demand(1);
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.NO_CLOSE));
}
@ParameterizedTest
@ -374,10 +366,10 @@ public class WebSocketCloseTest extends WebSocketTester
setup(State.ISHUT, scheme);
client.close();
assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS));
server.close();
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.NORMAL));
assertFalse(serverHandler.closed.await(250, TimeUnit.MILLISECONDS));
serverHandler.coreSession.close(CloseStatus.NORMAL, "", NOOP);
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.NORMAL));
}
@ParameterizedTest
@ -386,16 +378,16 @@ public class WebSocketCloseTest extends WebSocketTester
{
setup(State.OPEN, scheme);
client.getOutputStream().write(RawFrameBuilder.buildFrame(OpCode.BINARY, "binary", true));
client.getOutputStream().write(RawFrameBuilder.buildFrame(OpCode.TEXT, "throw from onFrame", true));
try (StacklessLogging stacklessLogging = new StacklessLogging(WebSocketCoreSession.class))
{
server.handler.getCoreSession().demand(1);
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
serverHandler.coreSession.demand(1);
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
}
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(server.handler.closeStatus.getReason(), containsString("onReceiveFrame throws for binary frames"));
assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(serverHandler.closeStatus.getReason(), containsString("deliberately throwing from onFrame"));
}
@ParameterizedTest
@ -404,16 +396,16 @@ public class WebSocketCloseTest extends WebSocketTester
{
setup(State.OSHUT, scheme);
client.getOutputStream().write(RawFrameBuilder.buildFrame(OpCode.BINARY, "binary", true));
client.getOutputStream().write(RawFrameBuilder.buildFrame(OpCode.TEXT, "throw from onFrame", true));
try (StacklessLogging stacklessLogging = new StacklessLogging(WebSocketCoreSession.class))
{
server.handler.getCoreSession().demand(1);
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
serverHandler.coreSession.demand(1);
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
}
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(server.handler.closeStatus.getReason(), containsString("onReceiveFrame throws for binary frames"));
assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(serverHandler.closeStatus.getReason(), containsString("deliberately throwing from onFrame"));
}
@ParameterizedTest
@ -422,10 +414,10 @@ public class WebSocketCloseTest extends WebSocketTester
{
setup(State.OPEN, scheme);
server.handler.getCoreSession().close(CloseStatus.SERVER_ERROR, "manually sent server error", Callback.NOOP);
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(server.handler.closeStatus.getReason(), containsString("manually sent server error"));
serverHandler.coreSession.close(CloseStatus.SERVER_ERROR, "manually sent server error", Callback.NOOP);
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(serverHandler.closeStatus.getReason(), containsString("manually sent server error"));
Frame frame = receiveFrame(client.getInputStream());
assertThat(CloseStatus.getCloseStatus(frame).getCode(), is(CloseStatus.SERVER_ERROR));
@ -438,9 +430,9 @@ public class WebSocketCloseTest extends WebSocketTester
setup(State.OPEN, scheme);
Callback.Completable callback1 = new Callback.Completable();
server.handler.getCoreSession().close(CloseStatus.NORMAL, "normal 1", callback1);
serverHandler.coreSession.close(CloseStatus.NORMAL, "normal 1", callback1);
Callback.Completable callback2 = new Callback.Completable();
server.handler.getCoreSession().close(CloseStatus.NORMAL, "normal 2", callback2);
serverHandler.coreSession.close(CloseStatus.NORMAL, "normal 2", callback2);
// First Callback Succeeded
assertDoesNotThrow(() -> callback1.get(5, TimeUnit.SECONDS));
@ -451,7 +443,7 @@ public class WebSocketCloseTest extends WebSocketTester
// Normal close frame received on client.
Frame closeFrame = receiveFrame(client.getInputStream());
assertThat(closeFrame.getOpCode(), is(OpCode.CLOSE));
assertThat(closeFrame.getOpCode(), is(CLOSE));
CloseStatus closeStatus = CloseStatus.getCloseStatus(closeFrame);
assertThat(closeStatus.getCode(), is(CloseStatus.NORMAL));
assertThat(closeStatus.getReason(), is("normal 1"));
@ -460,14 +452,14 @@ public class WebSocketCloseTest extends WebSocketTester
client.getOutputStream().write(RawFrameBuilder.buildClose(
new CloseStatus(CloseStatus.NORMAL, "normal response 1"), true));
server.handler.getCoreSession().demand(1);
assertNotNull(server.handler.receivedFrames.poll(10, TimeUnit.SECONDS));
Callback closeFrameCallback = Objects.requireNonNull(server.handler.receivedCallback.poll());
serverHandler.coreSession.demand(1);
assertNotNull(serverHandler.receivedFrames.poll(10, TimeUnit.SECONDS));
Callback closeFrameCallback = Objects.requireNonNull(serverHandler.receivedCallback.poll());
closeFrameCallback.succeeded();
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.NORMAL));
assertThat(server.handler.closeStatus.getReason(), is("normal response 1"));
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.NORMAL));
assertThat(serverHandler.closeStatus.getReason(), is("normal response 1"));
}
@ParameterizedTest
@ -477,9 +469,9 @@ public class WebSocketCloseTest extends WebSocketTester
setup(State.OPEN, scheme);
Callback.Completable callback1 = new Callback.Completable();
server.handler.getCoreSession().close(CloseStatus.SERVER_ERROR, "server error should succeed", callback1);
serverHandler.coreSession.close(CloseStatus.SERVER_ERROR, "server error should succeed", callback1);
Callback.Completable callback2 = new Callback.Completable();
server.handler.getCoreSession().close(CloseStatus.PROTOCOL, "protocol error should fail", callback2);
serverHandler.coreSession.close(CloseStatus.PROTOCOL, "protocol error should fail", callback2);
// First Callback Succeeded
assertDoesNotThrow(() -> callback1.get(5, TimeUnit.SECONDS));
@ -488,9 +480,9 @@ public class WebSocketCloseTest extends WebSocketTester
ExecutionException error = assertThrows(ExecutionException.class, () -> callback2.get(5, TimeUnit.SECONDS));
assertThat(error.getCause(), instanceOf(ClosedChannelException.class));
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(server.handler.closeStatus.getReason(), containsString("server error should succeed"));
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(serverHandler.closeStatus.getReason(), containsString("server error should succeed"));
Frame frame = receiveFrame(client.getInputStream());
assertThat(CloseStatus.getCloseStatus(frame).getCode(), is(CloseStatus.SERVER_ERROR));
@ -503,9 +495,9 @@ public class WebSocketCloseTest extends WebSocketTester
setup(State.OPEN, scheme);
Callback.Completable callback1 = new Callback.Completable();
server.handler.getCoreSession().close(CloseStatus.NORMAL, "normal close (client does not complete close handshake)", callback1);
serverHandler.coreSession.close(CloseStatus.NORMAL, "normal close (client does not complete close handshake)", callback1);
Callback.Completable callback2 = new Callback.Completable();
server.handler.getCoreSession().close(CloseStatus.SERVER_ERROR, "error close should overtake normal close", callback2);
serverHandler.coreSession.close(CloseStatus.SERVER_ERROR, "error close should overtake normal close", callback2);
// First Callback Succeeded
assertDoesNotThrow(() -> callback1.get(5, TimeUnit.SECONDS));
@ -514,18 +506,33 @@ public class WebSocketCloseTest extends WebSocketTester
ExecutionException error = assertThrows(ExecutionException.class, () -> callback2.get(5, TimeUnit.SECONDS));
assertThat(error.getCause(), instanceOf(ClosedChannelException.class));
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(server.handler.closeStatus.getReason(), containsString("error close should overtake normal close"));
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(serverHandler.closeStatus.getReason(), containsString("error close should overtake normal close"));
Frame frame = receiveFrame(client.getInputStream());
assertThat(CloseStatus.getCloseStatus(frame).getCode(), is(CloseStatus.NORMAL));
}
static class DemandingTestFrameHandler implements SynchronousFrameHandler
@ParameterizedTest
@ValueSource(strings = {WS_SCHEME, WSS_SCHEME})
public void testThrowFromOnCloseFrame(String scheme) throws Exception
{
setup(State.OSHUT, scheme);
CloseStatus closeStatus = new CloseStatus(CloseStatus.NORMAL, "throw from onFrame");
client.getOutputStream().write(RawFrameBuilder.buildClose(closeStatus, true));
serverHandler.coreSession.demand(1);
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(serverHandler.closeStatus.getReason(), containsString("deliberately throwing from onFrame"));
}
private static class DemandingTestFrameHandler implements FrameHandler
{
private CoreSession coreSession;
String state;
private String state;
protected BlockingQueue<Frame> receivedFrames = new BlockingArrayQueue<>();
protected BlockingQueue<Callback> receivedCallback = new BlockingArrayQueue<>();
@ -534,23 +541,19 @@ public class WebSocketCloseTest extends WebSocketTester
protected CountDownLatch closed = new CountDownLatch(1);
protected CloseStatus closeStatus = null;
public CoreSession getCoreSession()
{
return coreSession;
}
public BlockingQueue<Frame> getFrames()
{
return receivedFrames;
}
@Override
public void onOpen(CoreSession coreSession)
public void onOpen(CoreSession coreSession, Callback callback)
{
LOG.debug("onOpen {}", coreSession);
this.coreSession = coreSession;
state = this.coreSession.toString();
opened.countDown();
callback.succeeded();
}
@Override
@ -561,25 +564,29 @@ public class WebSocketCloseTest extends WebSocketTester
receivedCallback.offer(callback);
receivedFrames.offer(Frame.copy(frame));
if (frame.getOpCode() == OpCode.BINARY)
throw new IllegalArgumentException("onReceiveFrame throws for binary frames");
byte opCode = frame.getOpCode();
if ((opCode == TEXT && "throw from onFrame".equals(frame.getPayloadAsUTF8())) ||
(opCode == CLOSE && "throw from onFrame".equals(CloseStatus.getCloseStatus(frame).getReason())))
throw new RuntimeException("deliberately throwing from onFrame");
}
@Override
public void onClosed(CloseStatus closeStatus)
public void onClosed(CloseStatus closeStatus, Callback callback)
{
LOG.debug("onClosed {}", closeStatus);
state = coreSession.toString();
this.closeStatus = closeStatus;
closed.countDown();
callback.succeeded();
}
@Override
public void onError(Throwable cause)
public void onError(Throwable cause, Callback callback)
{
LOG.debug("onError {} ", cause);
LOG.debug("onError", cause);
error = cause;
state = coreSession.toString();
callback.succeeded();
}
@Override
@ -587,101 +594,5 @@ public class WebSocketCloseTest extends WebSocketTester
{
return true;
}
public void sendText(String text)
{
Frame frame = new Frame(OpCode.TEXT);
frame.setFin(true);
frame.setPayload(text);
getCoreSession().sendFrame(frame, NOOP, false);
state = coreSession.toString();
}
}
static class WebSocketServer extends AbstractLifeCycle
{
private static Logger LOG = LoggerFactory.getLogger(WebSocketServer.class);
private final Server server;
private final DemandingTestFrameHandler handler;
public void doStart() throws Exception
{
server.start();
}
public void doStop() throws Exception
{
server.stop();
}
public int getLocalPort()
{
return server.getBean(NetworkConnector.class).getLocalPort();
}
private SslContextFactory.Server createServerSslContextFactory()
{
SslContextFactory.Server sslContextFactory = new SslContextFactory.Server();
sslContextFactory.setKeyStorePath("src/test/resources/keystore.p12");
sslContextFactory.setKeyStorePassword("storepwd");
return sslContextFactory;
}
public WebSocketServer(int port, DemandingTestFrameHandler frameHandler, boolean tls)
{
this.handler = frameHandler;
server = new Server();
server.getBean(QueuedThreadPool.class).setName("WSCoreServer");
ServerConnector connector;
if (tls)
connector = new ServerConnector(server, createServerSslContextFactory());
else
connector = new ServerConnector(server);
connector.addBean(new RFC6455Handshaker());
connector.setPort(port);
connector.setIdleTimeout(1000000);
server.addConnector(connector);
ContextHandler context = new ContextHandler("/");
server.setHandler(context);
WebSocketNegotiator negotiator = new TestWebSocketNegotiator(frameHandler);
WebSocketUpgradeHandler upgradeHandler = new TestWebSocketUpgradeHandler(negotiator);
context.setHandler(upgradeHandler);
}
public void sendFrame(Frame frame)
{
handler.getCoreSession().sendFrame(frame, NOOP, false);
}
public void sendFrame(Frame frame, Callback callback)
{
handler.getCoreSession().sendFrame(frame, callback, false);
}
public void sendText(String line)
{
LOG.info("sending {}...", line);
handler.sendText(line);
}
public BlockingQueue<Frame> getFrames()
{
return handler.getFrames();
}
public void close()
{
handler.getCoreSession().close(CloseStatus.NORMAL, "WebSocketServer Initiated Close", Callback.NOOP);
}
public boolean isOpen()
{
return handler.getCoreSession().isOutputOpen();
}
}
}

View File

@ -26,13 +26,14 @@ import org.eclipse.jetty.server.NetworkConnector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.core.server.Negotiation;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
import org.eclipse.jetty.websocket.core.server.WebSocketUpgradeHandler;
public class WebSocketServer
{
private final Server server;
private final Server server = new Server();
private URI serverUri;
public void start() throws Exception
@ -58,13 +59,26 @@ public class WebSocketServer
public WebSocketServer(FrameHandler frameHandler)
{
this(new DefaultNegotiator(frameHandler));
this(new DefaultNegotiator(frameHandler), false);
}
public WebSocketServer(WebSocketNegotiator negotiator)
{
server = new Server();
ServerConnector connector = new ServerConnector(server);
this(negotiator, false);
}
public WebSocketServer(FrameHandler frameHandler, boolean tls)
{
this(new DefaultNegotiator(frameHandler), tls);
}
public WebSocketServer(WebSocketNegotiator negotiator, boolean tls)
{
ServerConnector connector;
if (tls)
connector = new ServerConnector(server, createServerSslContextFactory());
else
connector = new ServerConnector(server);
server.addConnector(connector);
ContextHandler context = new ContextHandler("/");
@ -74,6 +88,14 @@ public class WebSocketServer
context.setHandler(upgradeHandler);
}
private SslContextFactory.Server createServerSslContextFactory()
{
SslContextFactory.Server sslContextFactory = new SslContextFactory.Server();
sslContextFactory.setKeyStorePath("src/test/resources/keystore.p12");
sslContextFactory.setKeyStorePassword("storepwd");
return sslContextFactory;
}
public URI getUri()
{
return serverUri;

View File

@ -1,5 +1,5 @@
# Jetty Logging using jetty-slf4j-impl
# org.eclipse.jetty.LEVEL=DEBUG
org.eclipse.jetty.LEVEL=INFO
# org.eclipse.jetty.io.LEVEL=DEBUG
# org.eclipse.jetty.websocket.core.LEVEL=DEBUG
# org.eclipse.jetty.websocket.core.TestFrameHandler.LEVEL=DEBUG

View File

@ -188,10 +188,21 @@ public class JavaxOnCloseTest
assertThat(clientEndpoint.closeReason.getReasonPhrase(), is("abnormal close 1"));
}
@ClientEndpoint
public class ThrowOnCloseSocket extends EventSocket
{
@Override
public void onClose(CloseReason reason)
{
super.onClose(reason);
throw new RuntimeException("trigger onError from client onClose");
}
}
@Test
public void onErrorOccurringAfterOnClose() throws Exception
{
EventSocket clientEndpoint = new EventSocket();
EventSocket clientEndpoint = new ThrowOnCloseSocket();
URI uri = new URI("ws://localhost:" + connector.getLocalPort() + "/");
client.connectToServer(clientEndpoint, uri);
@ -199,16 +210,25 @@ public class JavaxOnCloseTest
assertTrue(serverEndpoint.openLatch.await(5, TimeUnit.SECONDS));
serverEndpoint.setOnClose((session) ->
{
throw new RuntimeException("trigger onError from onClose");
throw new RuntimeException("trigger onError from server onClose");
});
// Initiate close on client to cause the server to throw in onClose.
clientEndpoint.session.close();
assertTrue(clientEndpoint.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(clientEndpoint.closeReason.getCloseCode(), is(CloseCodes.UNEXPECTED_CONDITION));
assertThat(clientEndpoint.closeReason.getReasonPhrase(), containsString("trigger onError from onClose"));
// Test the receives the normal close, and throws in onClose.
assertTrue(serverEndpoint.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(serverEndpoint.closeReason.getCloseCode(), is(CloseCodes.NORMAL_CLOSURE));
assertTrue(serverEndpoint.errorLatch.await(5, TimeUnit.SECONDS));
assertThat(serverEndpoint.error, instanceOf(RuntimeException.class));
assertThat(serverEndpoint.error.getMessage(), containsString("trigger onError from onClose"));
assertThat(serverEndpoint.error.getMessage(), containsString("trigger onError from server onClose"));
assertTrue(clientEndpoint.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(clientEndpoint.closeReason.getCloseCode(), is(CloseCodes.UNEXPECTED_CONDITION));
assertThat(clientEndpoint.closeReason.getReasonPhrase(), containsString("trigger onError from server onClose"));
assertTrue(clientEndpoint.errorLatch.await(5, TimeUnit.SECONDS));
assertThat(clientEndpoint.error, instanceOf(RuntimeException.class));
assertThat(clientEndpoint.error.getMessage(), containsString("trigger onError from client onClose"));
}
}

View File

@ -1,5 +1,5 @@
# Jetty Logging using jetty-slf4j-impl
# org.eclipse.jetty.LEVEL=DEBUG
org.eclipse.jetty.LEVEL=INFO
# org.eclipse.jetty.util.log.stderr.LONG=true
# org.eclipse.jetty.server.AbstractConnector.LEVEL=DEBUG
# org.eclipse.jetty.io.WriteFlusher.LEVEL=DEBUG