Merge pull request #4931 from eclipse/jetty-10.0.x-4919-WebSocketContainerStop

Issue #4919 - WebSocket container graceful stop
This commit is contained in:
Lachlan 2020-07-29 13:19:53 +10:00 committed by GitHub
commit fe6f0eb87d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 434 additions and 43 deletions

View File

@ -139,4 +139,54 @@ public interface Graceful
return CompletableFuture.allOf(gracefuls.stream().map(Graceful::shutdown).toArray(CompletableFuture[]::new));
}
/**
* Utility method to execute a {@link ThrowingRunnable} in a new daemon thread and
* be notified of the result in a {@link CompletableFuture}.
* @param runnable the ThrowingRunnable to run.
* @return the CompletableFuture to be notified when the runnable either completes or fails.
*/
static CompletableFuture<Void> shutdown(ThrowingRunnable runnable)
{
AtomicReference<Thread> stopThreadReference = new AtomicReference<>();
CompletableFuture<Void> shutdown = new CompletableFuture<>()
{
@Override
public boolean cancel(boolean mayInterruptIfRunning)
{
boolean canceled = super.cancel(mayInterruptIfRunning);
if (canceled && mayInterruptIfRunning)
{
Thread thread = stopThreadReference.get();
if (thread != null)
thread.interrupt();
}
return canceled;
}
};
Thread stopThread = new Thread(() ->
{
try
{
runnable.run();
shutdown.complete(null);
}
catch (Throwable t)
{
shutdown.completeExceptionally(t);
}
});
stopThread.setDaemon(true);
stopThreadReference.set(stopThread);
stopThread.start();
return shutdown;
}
@FunctionalInterface
interface ThrowingRunnable
{
void run() throws Exception;
}
}

View File

@ -40,8 +40,8 @@ import org.slf4j.LoggerFactory;
public abstract class JavaxWebSocketContainer extends ContainerLifeCycle implements javax.websocket.WebSocketContainer
{
private static final Logger LOG = LoggerFactory.getLogger(JavaxWebSocketContainer.class);
private final SessionTracker sessionTracker = new SessionTracker();
private final List<JavaxWebSocketSessionListener> sessionListeners = new ArrayList<>();
protected final SessionTracker sessionTracker = new SessionTracker();
protected final Configuration.ConfigurationCustomizer defaultCustomizer = new Configuration.ConfigurationCustomizer();
protected final WebSocketComponents components;

View File

@ -18,22 +18,20 @@
package org.eclipse.jetty.websocket.javax.common;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.websocket.CloseReason;
import javax.websocket.Session;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.eclipse.jetty.util.component.Graceful;
public class SessionTracker extends AbstractLifeCycle implements JavaxWebSocketSessionListener
public class SessionTracker extends AbstractLifeCycle implements JavaxWebSocketSessionListener, Graceful
{
private static final Logger LOG = LoggerFactory.getLogger(SessionTracker.class);
private final CopyOnWriteArraySet<JavaxWebSocketSession> sessions = new CopyOnWriteArraySet<>();
private boolean isShutdown = false;
public Set<Session> getSessions()
{
@ -52,22 +50,40 @@ public class SessionTracker extends AbstractLifeCycle implements JavaxWebSocketS
sessions.remove(session);
}
@Override
protected void doStart() throws Exception
{
isShutdown = false;
super.doStart();
}
@Override
protected void doStop() throws Exception
{
for (Session session : sessions)
sessions.clear();
super.doStop();
}
@Override
public CompletableFuture<Void> shutdown()
{
isShutdown = true;
return Graceful.shutdown(() ->
{
try
for (Session session : sessions)
{
if (Thread.interrupted())
break;
// GOING_AWAY is abnormal close status so it will hard close connection after sent.
session.close(new CloseReason(CloseReason.CloseCodes.GOING_AWAY, "Container being shut down"));
}
catch (IOException e)
{
LOG.trace("IGNORED", e);
}
}
});
}
super.doStop();
@Override
public boolean isShutdown()
{
return isShutdown;
}
}

View File

@ -0,0 +1,43 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.websocket.javax.tests;
import java.io.IOException;
import java.nio.ByteBuffer;
import javax.websocket.ClientEndpoint;
import javax.websocket.server.ServerEndpoint;
@ServerEndpoint("/")
@ClientEndpoint
public class EchoSocket extends EventSocket
{
@Override
public void onMessage(String message) throws IOException
{
super.onMessage(message);
session.getBasicRemote().sendText(message);
}
@Override
public void onMessage(ByteBuffer message) throws IOException
{
super.onMessage(message);
session.getBasicRemote().sendBinary(message);
}
}

View File

@ -98,23 +98,4 @@ public class EventSocket
error = cause;
errorLatch.countDown();
}
@ServerEndpoint("/")
@ClientEndpoint
public static class EchoSocket extends EventSocket
{
@Override
public void onMessage(String message) throws IOException
{
super.onMessage(message);
session.getBasicRemote().sendText(message);
}
@Override
public void onMessage(ByteBuffer message) throws IOException
{
super.onMessage(message);
session.getBasicRemote().sendBinary(message);
}
}
}

View File

@ -0,0 +1,133 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.websocket.javax.tests;
import java.net.URI;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import javax.websocket.CloseReason;
import javax.websocket.EndpointConfig;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.component.Graceful;
import org.eclipse.jetty.websocket.javax.client.internal.JavaxWebSocketClientContainer;
import org.eclipse.jetty.websocket.javax.server.config.JavaxWebSocketServletContainerInitializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class GracefulCloseTest
{
private static final BlockingArrayQueue<EventSocket> serverEndpoints = new BlockingArrayQueue<>();
private Server server;
private URI serverUri;
private JavaxWebSocketClientContainer client;
@BeforeEach
public void before() throws Exception
{
server = new Server();
ServerConnector connector = new ServerConnector(server);
server.addConnector(connector);
ServletContextHandler contextHandler = new ServletContextHandler();
contextHandler.setContextPath("/");
server.setHandler(contextHandler);
JavaxWebSocketServletContainerInitializer.configure(contextHandler, (context, container) ->
container.addEndpoint(ServerSocket.class));
server.start();
serverUri = WSURI.toWebsocket(server.getURI());
// StopTimeout is necessary for the websocket server sessions to gracefully close.
server.setStopTimeout(1000);
client = new JavaxWebSocketClientContainer();
client.start();
}
@AfterEach
public void after() throws Exception
{
client.stop();
server.stop();
}
@ServerEndpoint("/")
public static class ServerSocket extends EchoSocket
{
@Override
public void onOpen(Session session, EndpointConfig endpointConfig)
{
serverEndpoints.add(this);
super.onOpen(session, endpointConfig);
}
}
@Test
public void testClientStop() throws Exception
{
EventSocket clientEndpoint = new EventSocket();
client.connectToServer(clientEndpoint, serverUri);
EventSocket serverEndpoint = Objects.requireNonNull(serverEndpoints.poll(5, TimeUnit.SECONDS));
// There is no API for a Javax WebSocketContainer stop timeout.
Graceful.shutdown(client).get(5, TimeUnit.SECONDS);
client.stop();
// Check that the client endpoint was closed with the correct status code and no error.
assertTrue(clientEndpoint.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(clientEndpoint.closeReason.getCloseCode(), is(CloseReason.CloseCodes.GOING_AWAY));
assertNull(clientEndpoint.error);
// Check that the server endpoint was closed with the correct status code and no error.
assertTrue(serverEndpoint.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(serverEndpoint.closeReason.getCloseCode(), is(CloseReason.CloseCodes.GOING_AWAY));
assertNull(serverEndpoint.error);
}
@Test
public void testServerStop() throws Exception
{
EventSocket clientEndpoint = new EventSocket();
client.connectToServer(clientEndpoint, serverUri);
EventSocket serverEndpoint = Objects.requireNonNull(serverEndpoints.poll(5, TimeUnit.SECONDS));
server.stop();
// Check that the client endpoint was closed with the correct status code and no error.
assertTrue(clientEndpoint.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(clientEndpoint.closeReason.getCloseCode(), is(CloseReason.CloseCodes.GOING_AWAY));
assertNull(clientEndpoint.error);
// Check that the server endpoint was closed with the correct status code and no error.
assertTrue(serverEndpoint.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(serverEndpoint.closeReason.getCloseCode(), is(CloseReason.CloseCodes.GOING_AWAY));
assertNull(serverEndpoint.error);
}
}

View File

@ -40,7 +40,7 @@ import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketSession;
import org.eclipse.jetty.websocket.javax.common.encoders.AvailableEncoders;
import org.eclipse.jetty.websocket.javax.server.config.JavaxWebSocketServletContainerInitializer;
import org.eclipse.jetty.websocket.javax.tests.EventSocket.EchoSocket;
import org.eclipse.jetty.websocket.javax.tests.EchoSocket;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.params.ParameterizedTest;

View File

@ -29,6 +29,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.eclipse.jetty.client.HttpClient;
@ -38,6 +39,7 @@ import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Graceful;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.ShutdownThread;
import org.eclipse.jetty.websocket.api.Session;
@ -68,6 +70,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli
private final Configuration.ConfigurationCustomizer configurationCustomizer = new Configuration.ConfigurationCustomizer();
private final WebSocketComponents components = new WebSocketComponents();
private boolean stopAtShutdown = false;
private long _stopTimeout = Long.MAX_VALUE;
/**
* Instantiate a WebSocketClient with defaults
@ -388,11 +391,33 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli
stopAtShutdown = stop;
}
/**
* The timeout to allow all remaining open Sessions to be closed gracefully using the close code {@link org.eclipse.jetty.websocket.api.StatusCode#SHUTDOWN}.
* @param stopTimeout the time in ms to wait for the graceful close, use a value less than or equal to 0 to not gracefully close.
*/
public void setStopTimeout(long stopTimeout)
{
_stopTimeout = stopTimeout;
}
public long getStopTimeout()
{
return _stopTimeout;
}
public boolean isStopAtShutdown()
{
return stopAtShutdown;
}
@Override
protected void doStop() throws Exception
{
if (getStopTimeout() > 0)
Graceful.shutdown(this).get(getStopTimeout(), TimeUnit.MILLISECONDS);
super.doStop();
}
@Override
public String toString()
{

View File

@ -20,16 +20,19 @@ package org.eclipse.jetty.websocket.common;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.Graceful;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketSessionListener;
public class SessionTracker extends AbstractLifeCycle implements WebSocketSessionListener
public class SessionTracker extends AbstractLifeCycle implements WebSocketSessionListener, Graceful
{
private List<Session> sessions = new CopyOnWriteArrayList<>();
private final List<Session> sessions = new CopyOnWriteArrayList<>();
private boolean isShutdown = false;
public Collection<Session> getSessions()
{
@ -48,15 +51,40 @@ public class SessionTracker extends AbstractLifeCycle implements WebSocketSessio
sessions.remove(session);
}
@Override
protected void doStart() throws Exception
{
isShutdown = false;
super.doStart();
}
@Override
protected void doStop() throws Exception
{
for (Session session : sessions)
{
// SHUTDOWN is abnormal close status so it will hard close connection after sent.
session.close(StatusCode.SHUTDOWN, "Container being shut down");
}
sessions.clear();
super.doStop();
}
@Override
public CompletableFuture<Void> shutdown()
{
isShutdown = true;
return Graceful.shutdown(() ->
{
for (Session session : sessions)
{
if (Thread.interrupted())
break;
// SHUTDOWN is abnormal close status so it will hard close connection after sent.
session.close(StatusCode.SHUTDOWN, "Container being shut down");
}
});
}
@Override
public boolean isShutdown()
{
return isShutdown;
}
}

View File

@ -119,6 +119,7 @@ public class JettyWebSocketServerContainer extends ContainerLifeCycle implements
frameHandlerFactory = factory;
addSessionListener(sessionTracker);
addBean(sessionTracker);
}
public void addMapping(String pathSpec, JettyWebSocketCreator creator)

View File

@ -0,0 +1,114 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.websocket.tests;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.util.WSURI;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.server.config.JettyWebSocketServletContainerInitializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class GracefulCloseTest
{
private final EventSocket serverEndpoint = new EchoSocket();
private Server server;
private URI serverUri;
private WebSocketClient client;
@BeforeEach
public void before() throws Exception
{
server = new Server();
ServerConnector connector = new ServerConnector(server);
server.addConnector(connector);
ServletContextHandler contextHandler = new ServletContextHandler();
contextHandler.setContextPath("/");
server.setHandler(contextHandler);
JettyWebSocketServletContainerInitializer.configure(contextHandler, (context, container) ->
container.addMapping("/", ((req, resp) -> serverEndpoint)));
server.start();
serverUri = WSURI.toWebsocket(server.getURI());
// StopTimeout is necessary for the websocket server sessions to gracefully close.
server.setStopTimeout(1000);
client = new WebSocketClient();
client.setStopTimeout(1000);
client.start();
}
@AfterEach
public void after() throws Exception
{
client.stop();
server.stop();
}
@Test
public void testClientStop() throws Exception
{
EventSocket clientEndpoint = new EventSocket();
client.connect(clientEndpoint, serverUri).get(5, TimeUnit.SECONDS);
client.stop();
// Check that the client endpoint was closed with the correct status code and no error.
assertTrue(clientEndpoint.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(clientEndpoint.closeCode, is(StatusCode.SHUTDOWN));
assertNull(clientEndpoint.error);
// Check that the server endpoint was closed with the correct status code and no error.
assertTrue(serverEndpoint.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(serverEndpoint.closeCode, is(StatusCode.SHUTDOWN));
assertNull(serverEndpoint.error);
}
@Test
public void testServerStop() throws Exception
{
EventSocket clientEndpoint = new EventSocket();
client.connect(clientEndpoint, serverUri).get(5, TimeUnit.SECONDS);
server.stop();
// Check that the client endpoint was closed with the correct status code and no error.
assertTrue(clientEndpoint.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(clientEndpoint.closeCode, is(StatusCode.SHUTDOWN));
assertNull(clientEndpoint.error);
// Check that the server endpoint was closed with the correct status code and no error.
assertTrue(serverEndpoint.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(serverEndpoint.closeCode, is(StatusCode.SHUTDOWN));
assertNull(serverEndpoint.error);
}
}