Issue #4919 - all websocket containers to implement Graceful shutdown interface
Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
parent
9f7f2e3e56
commit
695d239ac5
|
@ -276,11 +276,4 @@ public class JavaxWebSocketClientContainer extends JavaxWebSocketContainer imple
|
|||
|
||||
return new AnnotatedClientEndpointConfig(anno);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() throws Exception
|
||||
{
|
||||
sessionTracker.stop();
|
||||
super.doStop();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.ArrayList;
|
|||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.function.Consumer;
|
||||
import javax.websocket.Extension;
|
||||
|
@ -31,13 +32,15 @@ import javax.websocket.WebSocketContainer;
|
|||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.util.DecoratedObjectFactory;
|
||||
import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
||||
import org.eclipse.jetty.util.component.Graceful;
|
||||
import org.eclipse.jetty.websocket.core.Configuration;
|
||||
import org.eclipse.jetty.websocket.core.WebSocketComponents;
|
||||
import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry;
|
||||
import org.eclipse.jetty.websocket.util.ShutdownUtil;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public abstract class JavaxWebSocketContainer extends ContainerLifeCycle implements javax.websocket.WebSocketContainer
|
||||
public abstract class JavaxWebSocketContainer extends ContainerLifeCycle implements javax.websocket.WebSocketContainer, Graceful
|
||||
{
|
||||
private static final Logger LOG = LoggerFactory.getLogger(JavaxWebSocketContainer.class);
|
||||
private final List<JavaxWebSocketSessionListener> sessionListeners = new ArrayList<>();
|
||||
|
@ -49,7 +52,6 @@ public abstract class JavaxWebSocketContainer extends ContainerLifeCycle impleme
|
|||
{
|
||||
this.components = components;
|
||||
addSessionListener(sessionTracker);
|
||||
addBean(sessionTracker);
|
||||
}
|
||||
|
||||
public abstract Executor getExecutor();
|
||||
|
@ -198,4 +200,23 @@ public abstract class JavaxWebSocketContainer extends ContainerLifeCycle impleme
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() throws Exception
|
||||
{
|
||||
sessionTracker.start();
|
||||
super.doStart();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> shutdown()
|
||||
{
|
||||
return ShutdownUtil.shutdown(sessionTracker);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isShutdown()
|
||||
{
|
||||
return sessionTracker.isStopped();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.eclipse.jetty.websocket.javax.server.internal;
|
|||
import java.lang.reflect.Modifier;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.function.Function;
|
||||
import javax.servlet.ServletContext;
|
||||
|
@ -35,7 +34,6 @@ import org.eclipse.jetty.http.pathmap.UriTemplatePathSpec;
|
|||
import org.eclipse.jetty.server.handler.ContextHandler;
|
||||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
import org.eclipse.jetty.util.component.Graceful;
|
||||
import org.eclipse.jetty.util.component.LifeCycle;
|
||||
import org.eclipse.jetty.websocket.core.WebSocketComponents;
|
||||
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
|
||||
|
@ -50,7 +48,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ManagedObject("JSR356 Server Container")
|
||||
public class JavaxWebSocketServerContainer extends JavaxWebSocketClientContainer implements javax.websocket.server.ServerContainer, LifeCycle.Listener, Graceful
|
||||
public class JavaxWebSocketServerContainer extends JavaxWebSocketClientContainer implements javax.websocket.server.ServerContainer, LifeCycle.Listener
|
||||
{
|
||||
public static final String JAVAX_WEBSOCKET_CONTAINER_ATTRIBUTE = javax.websocket.server.ServerContainer.class.getName();
|
||||
private static final Logger LOG = LoggerFactory.getLogger(JavaxWebSocketServerContainer.class);
|
||||
|
@ -303,32 +301,4 @@ public class JavaxWebSocketServerContainer extends JavaxWebSocketClientContainer
|
|||
deferredEndpointConfigs.clear();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> shutdown()
|
||||
{
|
||||
CompletableFuture<Void> shutdown = new CompletableFuture<>();
|
||||
new Thread(() ->
|
||||
{
|
||||
try
|
||||
{
|
||||
LifeCycle.stop(sessionTracker);
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
LOG.warn("Error while stopping SessionTracker", t);
|
||||
}
|
||||
finally
|
||||
{
|
||||
shutdown.complete(null);
|
||||
}
|
||||
}).start();
|
||||
return shutdown;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isShutdown()
|
||||
{
|
||||
return sessionTracker.isStopped();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.eclipse.jetty.server.Server;
|
|||
import org.eclipse.jetty.server.ServerConnector;
|
||||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||
import org.eclipse.jetty.util.BlockingArrayQueue;
|
||||
import org.eclipse.jetty.util.component.Graceful;
|
||||
import org.eclipse.jetty.websocket.javax.client.internal.JavaxWebSocketClientContainer;
|
||||
import org.eclipse.jetty.websocket.javax.server.config.JavaxWebSocketServletContainerInitializer;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
|
@ -95,6 +96,8 @@ public class GracefulCloseTest
|
|||
client.connectToServer(clientEndpoint, serverUri);
|
||||
EventSocket serverEndpoint = Objects.requireNonNull(serverEndpoints.poll(5, TimeUnit.SECONDS));
|
||||
|
||||
// There is no API for a Javax WebSocketContainer stop timeout.
|
||||
Graceful.shutdown(client).get(5, TimeUnit.SECONDS);
|
||||
client.stop();
|
||||
|
||||
// Check that the client endpoint was closed with the correct status code and no error.
|
||||
|
|
|
@ -29,6 +29,7 @@ import java.util.concurrent.CompletableFuture;
|
|||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.eclipse.jetty.client.HttpClient;
|
||||
|
@ -38,6 +39,7 @@ import org.eclipse.jetty.io.ByteBufferPool;
|
|||
import org.eclipse.jetty.io.Connection;
|
||||
import org.eclipse.jetty.util.DecoratedObjectFactory;
|
||||
import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
||||
import org.eclipse.jetty.util.component.Graceful;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
import org.eclipse.jetty.util.thread.ShutdownThread;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
|
@ -53,10 +55,11 @@ import org.eclipse.jetty.websocket.core.Configuration;
|
|||
import org.eclipse.jetty.websocket.core.WebSocketComponents;
|
||||
import org.eclipse.jetty.websocket.core.client.UpgradeListener;
|
||||
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
|
||||
import org.eclipse.jetty.websocket.util.ShutdownUtil;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class WebSocketClient extends ContainerLifeCycle implements WebSocketPolicy, WebSocketContainer
|
||||
public class WebSocketClient extends ContainerLifeCycle implements WebSocketPolicy, WebSocketContainer, Graceful
|
||||
{
|
||||
private static final Logger LOG = LoggerFactory.getLogger(WebSocketClient.class);
|
||||
private final WebSocketCoreClient coreClient;
|
||||
|
@ -67,6 +70,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli
|
|||
private final Configuration.ConfigurationCustomizer configurationCustomizer = new Configuration.ConfigurationCustomizer();
|
||||
private final WebSocketComponents components = new WebSocketComponents();
|
||||
private boolean stopAtShutdown = false;
|
||||
private long _stopTimeout = 200;
|
||||
|
||||
/**
|
||||
* Instantiate a WebSocketClient with defaults
|
||||
|
@ -91,7 +95,6 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli
|
|||
|
||||
frameHandlerFactory = new JettyWebSocketFrameHandlerFactory(this);
|
||||
sessionListeners.add(sessionTracker);
|
||||
addBean(sessionTracker);
|
||||
}
|
||||
|
||||
public CompletableFuture<Session> connect(Object websocket, URI toUri) throws IOException
|
||||
|
@ -380,18 +383,48 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli
|
|||
stopAtShutdown = stop;
|
||||
}
|
||||
|
||||
public void setStopTimeout(long stopTimeout)
|
||||
{
|
||||
_stopTimeout = stopTimeout;
|
||||
}
|
||||
|
||||
public long getStopTimeout()
|
||||
{
|
||||
return _stopTimeout;
|
||||
}
|
||||
|
||||
public boolean isStopAtShutdown()
|
||||
{
|
||||
return stopAtShutdown;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() throws Exception
|
||||
{
|
||||
sessionTracker.start();
|
||||
super.doStart();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() throws Exception
|
||||
{
|
||||
sessionTracker.stop();
|
||||
if (getStopTimeout() > 0)
|
||||
Graceful.shutdown(this).get(getStopTimeout(), TimeUnit.MILLISECONDS);
|
||||
super.doStop();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> shutdown()
|
||||
{
|
||||
return ShutdownUtil.shutdown(sessionTracker);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isShutdown()
|
||||
{
|
||||
return sessionTracker.isStopped();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
|
|
@ -44,6 +44,7 @@ import org.eclipse.jetty.websocket.core.exception.WebSocketException;
|
|||
import org.eclipse.jetty.websocket.core.server.WebSocketServerComponents;
|
||||
import org.eclipse.jetty.websocket.server.config.JettyWebSocketServletContainerInitializer;
|
||||
import org.eclipse.jetty.websocket.server.internal.JettyServerFrameHandlerFactory;
|
||||
import org.eclipse.jetty.websocket.util.ShutdownUtil;
|
||||
import org.eclipse.jetty.websocket.util.server.internal.FrameHandlerFactory;
|
||||
import org.eclipse.jetty.websocket.util.server.internal.WebSocketMapping;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -120,7 +121,6 @@ public class JettyWebSocketServerContainer extends ContainerLifeCycle implements
|
|||
frameHandlerFactory = factory;
|
||||
|
||||
addSessionListener(sessionTracker);
|
||||
addBean(sessionTracker);
|
||||
}
|
||||
|
||||
public void addMapping(String pathSpec, JettyWebSocketCreator creator)
|
||||
|
@ -264,26 +264,17 @@ public class JettyWebSocketServerContainer extends ContainerLifeCycle implements
|
|||
customizer.setAutoFragment(autoFragment);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() throws Exception
|
||||
{
|
||||
sessionTracker.start();
|
||||
super.doStart();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> shutdown()
|
||||
{
|
||||
CompletableFuture<Void> shutdown = new CompletableFuture<>();
|
||||
new Thread(() ->
|
||||
{
|
||||
try
|
||||
{
|
||||
LifeCycle.stop(sessionTracker);
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
LOG.warn("Error while stopping SessionTracker", t);
|
||||
}
|
||||
finally
|
||||
{
|
||||
shutdown.complete(null);
|
||||
}
|
||||
}).start();
|
||||
return shutdown;
|
||||
return ShutdownUtil.shutdown(sessionTracker);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -63,6 +63,7 @@ public class GracefulCloseTest
|
|||
server.setStopTimeout(1000);
|
||||
|
||||
client = new WebSocketClient();
|
||||
client.setStopTimeout(1000);
|
||||
client.start();
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,75 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
|
||||
//
|
||||
// This program and the accompanying materials are made available under
|
||||
// the terms of the Eclipse Public License 2.0 which is available at
|
||||
// https://www.eclipse.org/legal/epl-2.0
|
||||
//
|
||||
// This Source Code may also be made available under the following
|
||||
// Secondary Licenses when the conditions for such availability set
|
||||
// forth in the Eclipse Public License, v. 2.0 are satisfied:
|
||||
// the Apache License v2.0 which is available at
|
||||
// https://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.util;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.eclipse.jetty.util.component.LifeCycle;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class ShutdownUtil
|
||||
{
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ShutdownUtil.class);
|
||||
|
||||
/**
|
||||
* Shutdown a {@link LifeCycle} in a new daemon thread and be notified on the result in a {@link CompletableFuture}.
|
||||
* @param lifeCycle the LifeCycle to stop.
|
||||
* @return the CompletableFuture to be notified when the stop either completes or fails.
|
||||
*/
|
||||
public static CompletableFuture<Void> shutdown(LifeCycle lifeCycle)
|
||||
{
|
||||
AtomicReference<Thread> stopThreadReference = new AtomicReference<>();
|
||||
CompletableFuture<Void> shutdown = new CompletableFuture<>()
|
||||
{
|
||||
@Override
|
||||
public boolean cancel(boolean mayInterruptIfRunning)
|
||||
{
|
||||
boolean canceled = super.cancel(mayInterruptIfRunning);
|
||||
if (canceled && mayInterruptIfRunning)
|
||||
{
|
||||
Thread thread = stopThreadReference.get();
|
||||
if (thread != null)
|
||||
thread.interrupt();
|
||||
}
|
||||
|
||||
return canceled;
|
||||
}
|
||||
};
|
||||
|
||||
Thread stopThread = new Thread(() ->
|
||||
{
|
||||
try
|
||||
{
|
||||
lifeCycle.stop();
|
||||
shutdown.complete(null);
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
LOG.warn("Error while stopping {}", lifeCycle, t);
|
||||
shutdown.completeExceptionally(t);
|
||||
}
|
||||
});
|
||||
stopThread.setDaemon(true);
|
||||
stopThreadReference.set(stopThread);
|
||||
stopThread.start();
|
||||
return shutdown;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue