Issue #4502 - add onClose tests for Jetty and Javax WS APIs

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2020-01-29 15:20:36 +11:00
parent 4ef208e9f6
commit fdd27a9f28
5 changed files with 425 additions and 3 deletions

View File

@ -163,9 +163,7 @@ public class FutureCallback implements Future<Void>, Callback
{
Throwable cause = e.getCause();
if (cause instanceof RuntimeException)
throw (RuntimeException)cause;
else if (cause instanceof IOException)
throw (IOException)cause;
throw new RuntimeException(cause);
else
throw new IOException(cause);
}

View File

@ -50,6 +50,7 @@ public class EventSocket
public CountDownLatch openLatch = new CountDownLatch(1);
public CountDownLatch closeLatch = new CountDownLatch(1);
public CountDownLatch errorLatch = new CountDownLatch(1);
@OnOpen
public void onOpen(Session session, EndpointConfig endpointConfig)
@ -85,5 +86,6 @@ public class EventSocket
if (LOG.isDebugEnabled())
LOG.debug("{} onError(): {}", toString(), cause);
error = cause;
errorLatch.countDown();
}
}

View File

@ -0,0 +1,226 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.websocket.javax.tests;
import java.io.IOException;
import java.net.URI;
import java.nio.channels.ClosedChannelException;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.websocket.ClientEndpoint;
import javax.websocket.CloseReason;
import javax.websocket.CloseReason.CloseCodes;
import javax.websocket.EndpointConfig;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.websocket.javax.client.JavaxWebSocketClientContainer;
import org.eclipse.jetty.websocket.javax.server.config.JavaxWebSocketServletContainerInitializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class JavaxOnCloseTest
{
private static BlockingArrayQueue<OnCloseEndpoint> serverEndpoints = new BlockingArrayQueue<>();
private Server server;
private ServerConnector connector;
private JavaxWebSocketClientContainer client = new JavaxWebSocketClientContainer();
@ServerEndpoint("/")
public static class OnCloseEndpoint extends EventSocket
{
private Consumer<Session> onClose;
public void setOnClose(Consumer<Session> onClose)
{
this.onClose = onClose;
}
@Override
public void onOpen(Session session, EndpointConfig endpointConfig)
{
super.onOpen(session, endpointConfig);
serverEndpoints.add(this);
}
@Override
public void onClose(CloseReason reason)
{
super.onClose(reason);
onClose.accept(session);
}
}
@ClientEndpoint
public static class BlockingClientEndpoint extends EventSocket
{
private CountDownLatch blockInClose = new CountDownLatch(1);
public void unBlockClose()
{
blockInClose.countDown();
}
@Override
public void onClose(CloseReason reason)
{
try
{
blockInClose.await();
super.onClose(reason);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
}
}
@BeforeEach
public void start() throws Exception
{
server = new Server();
connector = new ServerConnector(server);
connector.setPort(0);
server.addConnector(connector);
ServletContextHandler contextHandler = new ServletContextHandler(ServletContextHandler.SESSIONS);
contextHandler.setContextPath("/");
server.setHandler(contextHandler);
JavaxWebSocketServletContainerInitializer.configure(contextHandler, ((servletContext, container) ->
container.addEndpoint(OnCloseEndpoint.class)));
client.start();
server.start();
}
@AfterEach
public void stop() throws Exception
{
client.stop();
server.stop();
}
@Test
public void changeStatusCodeInOnClose() throws Exception
{
EventSocket clientEndpoint = new EventSocket();
URI uri = new URI("ws://localhost:" + connector.getLocalPort() + "/");
client.connectToServer(clientEndpoint, uri);
OnCloseEndpoint serverEndpoint = Objects.requireNonNull(serverEndpoints.poll(5, TimeUnit.SECONDS));
serverEndpoint.setOnClose((session) -> assertDoesNotThrow(() ->
session.close(new CloseReason(CloseCodes.SERVICE_RESTART, "custom close reason"))));
assertTrue(serverEndpoint.openLatch.await(5, TimeUnit.SECONDS));
clientEndpoint.session.close();
assertTrue(clientEndpoint.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(clientEndpoint.closeReason.getCloseCode(), is(CloseCodes.SERVICE_RESTART));
assertThat(clientEndpoint.closeReason.getReasonPhrase(), is("custom close reason"));
}
@Test
public void secondCloseFromOnCloseFails() throws Exception
{
EventSocket clientEndpoint = new EventSocket();
URI uri = new URI("ws://localhost:" + connector.getLocalPort() + "/");
client.connectToServer(clientEndpoint, uri);
OnCloseEndpoint serverEndpoint = Objects.requireNonNull(serverEndpoints.poll(5, TimeUnit.SECONDS));
assertTrue(serverEndpoint.openLatch.await(5, TimeUnit.SECONDS));
serverEndpoint.setOnClose((session) -> assertThrows(ClosedChannelException.class, session::close));
serverEndpoint.session.close(new CloseReason(CloseCodes.NORMAL_CLOSURE, "first close"));
assertTrue(clientEndpoint.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(clientEndpoint.closeReason.getCloseCode(), is(CloseCodes.NORMAL_CLOSURE));
assertThat(clientEndpoint.closeReason.getReasonPhrase(), is("first close"));
}
@Test
public void abnormalStatusDoesNotChange() throws Exception
{
BlockingClientEndpoint clientEndpoint = new BlockingClientEndpoint();
URI uri = new URI("ws://localhost:" + connector.getLocalPort() + "/");
client.connectToServer(clientEndpoint, uri);
OnCloseEndpoint serverEndpoint = Objects.requireNonNull(serverEndpoints.poll(5, TimeUnit.SECONDS));
assertTrue(serverEndpoint.openLatch.await(5, TimeUnit.SECONDS));
serverEndpoint.setOnClose((session) ->
{
assertThrows(ClosedChannelException.class,
() -> session.close(new CloseReason(CloseCodes.UNEXPECTED_CONDITION, "abnormal close 2")));
clientEndpoint.unBlockClose();
});
serverEndpoint.session.close(new CloseReason(CloseCodes.PROTOCOL_ERROR, "abnormal close 1"));
assertTrue(clientEndpoint.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(clientEndpoint.closeReason.getCloseCode(), is(CloseCodes.PROTOCOL_ERROR));
assertThat(clientEndpoint.closeReason.getReasonPhrase(), is("abnormal close 1"));
}
@Test
public void onErrorOccurringAfterOnClose() throws Exception
{
EventSocket clientEndpoint = new EventSocket();
URI uri = new URI("ws://localhost:" + connector.getLocalPort() + "/");
client.connectToServer(clientEndpoint, uri);
OnCloseEndpoint serverEndpoint = Objects.requireNonNull(serverEndpoints.poll(5, TimeUnit.SECONDS));
assertTrue(serverEndpoint.openLatch.await(5, TimeUnit.SECONDS));
serverEndpoint.setOnClose((session) ->
{
throw new RuntimeException("trigger onError from onClose");
});
try
{
clientEndpoint.session.close();
}
catch (IOException e)
{
// Ignore. This only occurs in the rare case where the
// close response is received while we are still sending the message.
}
assertTrue(clientEndpoint.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(clientEndpoint.closeReason.getCloseCode(), is(CloseCodes.UNEXPECTED_CONDITION));
assertThat(clientEndpoint.closeReason.getReasonPhrase(), containsString("trigger onError from onClose"));
assertTrue(serverEndpoint.errorLatch.await(5, TimeUnit.SECONDS));
assertThat(serverEndpoint.error, instanceOf(RuntimeException.class));
assertThat(serverEndpoint.error.getMessage(), containsString("trigger onError from onClose"));
}
}

View File

@ -0,0 +1,195 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.websocket.tests;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.server.config.JettyWebSocketServletContainerInitializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class JettyOnCloseTest
{
private Server server;
private ServerConnector connector;
private WebSocketClient client;
private OnCloseEndpoint serverEndpoint = new OnCloseEndpoint();
@WebSocket
public static class OnCloseEndpoint extends EventSocket
{
private Consumer<Session> onClose;
public void setOnClose(Consumer<Session> onClose)
{
this.onClose = onClose;
}
@Override
public void onClose(int statusCode, String reason)
{
onClose.accept(session);
super.onClose(statusCode, reason);
}
}
@WebSocket
public static class BlockingClientEndpoint extends EventSocket
{
private CountDownLatch blockInClose = new CountDownLatch(1);
public void unBlockClose()
{
blockInClose.countDown();
}
@Override
public void onClose(int statusCode, String reason)
{
try
{
blockInClose.await();
super.onClose(statusCode, reason);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
}
}
@BeforeEach
public void start() throws Exception
{
server = new Server();
connector = new ServerConnector(server);
connector.setPort(0);
server.addConnector(connector);
ServletContextHandler contextHandler = new ServletContextHandler(ServletContextHandler.SESSIONS);
contextHandler.setContextPath("/");
server.setHandler(contextHandler);
JettyWebSocketServletContainerInitializer.configure(contextHandler, ((servletContext, container) ->
container.addMapping("/", (req, resp) -> serverEndpoint)));
client = new WebSocketClient();
server.start();
client.start();
}
@AfterEach
public void stop() throws Exception
{
client.stop();
server.stop();
}
@Test
public void changeStatusCodeInOnClose() throws Exception
{
EventSocket clientEndpoint = new EventSocket();
URI uri = new URI("ws://localhost:" + connector.getLocalPort() + "/");
client.connect(clientEndpoint, uri).get(5, TimeUnit.SECONDS);
assertTrue(serverEndpoint.openLatch.await(5, TimeUnit.SECONDS));
serverEndpoint.setOnClose((session) -> session.close(StatusCode.SERVICE_RESTART, "custom close reason"));
clientEndpoint.session.close();
assertTrue(clientEndpoint.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(clientEndpoint.statusCode, is(StatusCode.SERVICE_RESTART));
assertThat(clientEndpoint.reason, is("custom close reason"));
}
@Test
public void secondCloseFromOnCloseFails() throws Exception
{
EventSocket clientEndpoint = new EventSocket();
URI uri = new URI("ws://localhost:" + connector.getLocalPort() + "/");
client.connect(clientEndpoint, uri).get(5, TimeUnit.SECONDS);
assertTrue(serverEndpoint.openLatch.await(5, TimeUnit.SECONDS));
serverEndpoint.setOnClose(Session::close);
serverEndpoint.session.close(StatusCode.NORMAL, "first close");
assertTrue(clientEndpoint.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(clientEndpoint.statusCode, is(StatusCode.NORMAL));
assertThat(clientEndpoint.reason, is("first close"));
}
@Test
public void abnormalStatusDoesNotChange() throws Exception
{
BlockingClientEndpoint clientEndpoint = new BlockingClientEndpoint();
URI uri = new URI("ws://localhost:" + connector.getLocalPort() + "/");
client.connect(clientEndpoint, uri).get(5, TimeUnit.SECONDS);
assertTrue(serverEndpoint.openLatch.await(5, TimeUnit.SECONDS));
serverEndpoint.setOnClose((session) ->
{
session.close(StatusCode.SERVER_ERROR, "abnormal close 2");
clientEndpoint.unBlockClose();
});
serverEndpoint.session.close(StatusCode.PROTOCOL, "abnormal close 1");
assertTrue(clientEndpoint.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(clientEndpoint.statusCode, is(StatusCode.PROTOCOL));
assertThat(clientEndpoint.reason, is("abnormal close 1"));
}
@Test
public void onErrorOccurringAfterOnClose() throws Exception
{
EventSocket clientEndpoint = new EventSocket();
URI uri = new URI("ws://localhost:" + connector.getLocalPort() + "/");
client.connect(clientEndpoint, uri).get(5, TimeUnit.SECONDS);
assertTrue(serverEndpoint.openLatch.await(5, TimeUnit.SECONDS));
serverEndpoint.setOnClose((session) ->
{
throw new RuntimeException("trigger onError from onClose");
});
clientEndpoint.session.close();
assertTrue(clientEndpoint.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(clientEndpoint.statusCode, is(StatusCode.SERVER_ERROR));
assertThat(clientEndpoint.reason, containsString("trigger onError from onClose"));
assertTrue(serverEndpoint.errorLatch.await(5, TimeUnit.SECONDS));
assertThat(serverEndpoint.error, instanceOf(RuntimeException.class));
assertThat(serverEndpoint.error.getMessage(), containsString("trigger onError from onClose"));
}
}

View File

@ -2,6 +2,7 @@ org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
# org.eclipse.jetty.LEVEL=DEBUG
# org.eclipse.jetty.websocket.LEVEL=DEBUG
# org.eclipse.jetty.websocket.test.LEVEL=DEBUG
# org.eclipse.jetty.websocket.tests.EventSocket.LEVEL=DEBUG
# org.eclipse.jetty.server.AbstractConnector.LEVEL=DEBUG
# org.eclipse.jetty.io.WriteFlusher.LEVEL=DEBUG
# org.eclipse.jetty.io.FillInterest.LEVEL=DEBUG