Issue #4919 - make the SessionTracker the one to implement Graceful shutdown

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2020-07-28 17:38:41 +10:00
parent 9e383f0891
commit 1be0220988
7 changed files with 121 additions and 169 deletions

View File

@ -139,4 +139,54 @@ public interface Graceful
return CompletableFuture.allOf(gracefuls.stream().map(Graceful::shutdown).toArray(CompletableFuture[]::new));
}
/**
* Utility method to execute a {@link ThrowingRunnable} in a new daemon thread and
* be notified of the result in a {@link CompletableFuture}.
* @param runnable the ThrowingRunnable to run.
* @return the CompletableFuture to be notified when the runnable either completes or fails.
*/
static CompletableFuture<Void> shutdown(ThrowingRunnable runnable)
{
AtomicReference<Thread> stopThreadReference = new AtomicReference<>();
CompletableFuture<Void> shutdown = new CompletableFuture<>()
{
@Override
public boolean cancel(boolean mayInterruptIfRunning)
{
boolean canceled = super.cancel(mayInterruptIfRunning);
if (canceled && mayInterruptIfRunning)
{
Thread thread = stopThreadReference.get();
if (thread != null)
thread.interrupt();
}
return canceled;
}
};
Thread stopThread = new Thread(() ->
{
try
{
runnable.run();
shutdown.complete(null);
}
catch (Throwable t)
{
shutdown.completeExceptionally(t);
}
});
stopThread.setDaemon(true);
stopThreadReference.set(stopThread);
stopThread.start();
return shutdown;
}
@FunctionalInterface
interface ThrowingRunnable
{
void run() throws Exception;
}
}

View File

@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import javax.websocket.Extension;
@ -32,15 +31,13 @@ import javax.websocket.WebSocketContainer;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Graceful;
import org.eclipse.jetty.websocket.core.Configuration;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry;
import org.eclipse.jetty.websocket.util.ShutdownUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class JavaxWebSocketContainer extends ContainerLifeCycle implements javax.websocket.WebSocketContainer, Graceful
public abstract class JavaxWebSocketContainer extends ContainerLifeCycle implements javax.websocket.WebSocketContainer
{
private static final Logger LOG = LoggerFactory.getLogger(JavaxWebSocketContainer.class);
private final List<JavaxWebSocketSessionListener> sessionListeners = new ArrayList<>();
@ -52,6 +49,7 @@ public abstract class JavaxWebSocketContainer extends ContainerLifeCycle impleme
{
this.components = components;
addSessionListener(sessionTracker);
addBean(sessionTracker);
}
public abstract Executor getExecutor();
@ -200,23 +198,4 @@ public abstract class JavaxWebSocketContainer extends ContainerLifeCycle impleme
}
}
}
@Override
protected void doStart() throws Exception
{
sessionTracker.start();
super.doStart();
}
@Override
public CompletableFuture<Void> shutdown()
{
return ShutdownUtil.stop(sessionTracker);
}
@Override
public boolean isShutdown()
{
return sessionTracker.isStopped();
}
}

View File

@ -18,22 +18,20 @@
package org.eclipse.jetty.websocket.javax.common;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.websocket.CloseReason;
import javax.websocket.Session;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.eclipse.jetty.util.component.Graceful;
public class SessionTracker extends AbstractLifeCycle implements JavaxWebSocketSessionListener
public class SessionTracker extends AbstractLifeCycle implements JavaxWebSocketSessionListener, Graceful
{
private static final Logger LOG = LoggerFactory.getLogger(SessionTracker.class);
private final CopyOnWriteArraySet<JavaxWebSocketSession> sessions = new CopyOnWriteArraySet<>();
private boolean isShutdown = false;
public Set<Session> getSessions()
{
@ -52,25 +50,40 @@ public class SessionTracker extends AbstractLifeCycle implements JavaxWebSocketS
sessions.remove(session);
}
@Override
protected void doStart() throws Exception
{
isShutdown = false;
super.doStart();
}
@Override
protected void doStop() throws Exception
{
for (Session session : sessions)
{
if (Thread.interrupted())
break;
sessions.clear();
super.doStop();
}
try
@Override
public CompletableFuture<Void> shutdown()
{
isShutdown = true;
return Graceful.shutdown(() ->
{
for (Session session : sessions)
{
if (Thread.interrupted())
break;
// GOING_AWAY is abnormal close status so it will hard close connection after sent.
session.close(new CloseReason(CloseReason.CloseCodes.GOING_AWAY, "Container being shut down"));
}
catch (IOException e)
{
LOG.trace("IGNORED", e);
}
}
});
}
super.doStop();
@Override
public boolean isShutdown()
{
return isShutdown;
}
}

View File

@ -56,11 +56,10 @@ import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.client.UpgradeListener;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
import org.eclipse.jetty.websocket.util.ShutdownUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WebSocketClient extends ContainerLifeCycle implements WebSocketPolicy, WebSocketContainer, Graceful
public class WebSocketClient extends ContainerLifeCycle implements WebSocketPolicy, WebSocketContainer
{
private static final Logger LOG = LoggerFactory.getLogger(WebSocketClient.class);
private final WebSocketCoreClient coreClient;
@ -96,6 +95,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli
frameHandlerFactory = new JettyWebSocketFrameHandlerFactory(this);
sessionListeners.add(sessionTracker);
addBean(sessionTracker);
}
public CompletableFuture<Session> connect(Object websocket, URI toUri) throws IOException
@ -410,13 +410,6 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli
return stopAtShutdown;
}
@Override
protected void doStart() throws Exception
{
sessionTracker.start();
super.doStart();
}
@Override
protected void doStop() throws Exception
{
@ -425,18 +418,6 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli
super.doStop();
}
@Override
public CompletableFuture<Void> shutdown()
{
return ShutdownUtil.stop(sessionTracker);
}
@Override
public boolean isShutdown()
{
return sessionTracker.isStopped();
}
@Override
public String toString()
{

View File

@ -20,16 +20,19 @@ package org.eclipse.jetty.websocket.common;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.Graceful;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketSessionListener;
public class SessionTracker extends AbstractLifeCycle implements WebSocketSessionListener
public class SessionTracker extends AbstractLifeCycle implements WebSocketSessionListener, Graceful
{
private final List<Session> sessions = new CopyOnWriteArrayList<>();
private boolean isShutdown = false;
public Collection<Session> getSessions()
{
@ -48,18 +51,40 @@ public class SessionTracker extends AbstractLifeCycle implements WebSocketSessio
sessions.remove(session);
}
@Override
protected void doStart() throws Exception
{
isShutdown = false;
super.doStart();
}
@Override
protected void doStop() throws Exception
{
for (Session session : sessions)
{
if (Thread.interrupted())
break;
// SHUTDOWN is abnormal close status so it will hard close connection after sent.
session.close(StatusCode.SHUTDOWN, "Container being shut down");
}
sessions.clear();
super.doStop();
}
@Override
public CompletableFuture<Void> shutdown()
{
isShutdown = true;
return Graceful.shutdown(() ->
{
for (Session session : sessions)
{
if (Thread.interrupted())
break;
// SHUTDOWN is abnormal close status so it will hard close connection after sent.
session.close(StatusCode.SHUTDOWN, "Container being shut down");
}
});
}
@Override
public boolean isShutdown()
{
return isShutdown;
}
}

View File

@ -22,7 +22,6 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import javax.servlet.ServletContext;
@ -30,7 +29,6 @@ import javax.servlet.ServletContext;
import org.eclipse.jetty.http.pathmap.PathSpec;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Graceful;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
@ -45,13 +43,12 @@ import org.eclipse.jetty.websocket.core.server.WebSocketServerComponents;
import org.eclipse.jetty.websocket.server.config.JettyWebSocketServletContainerInitializer;
import org.eclipse.jetty.websocket.server.internal.JettyServerFrameHandlerFactory;
import org.eclipse.jetty.websocket.util.ReflectUtils;
import org.eclipse.jetty.websocket.util.ShutdownUtil;
import org.eclipse.jetty.websocket.util.server.internal.FrameHandlerFactory;
import org.eclipse.jetty.websocket.util.server.internal.WebSocketMapping;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JettyWebSocketServerContainer extends ContainerLifeCycle implements WebSocketContainer, WebSocketPolicy, LifeCycle.Listener, Graceful
public class JettyWebSocketServerContainer extends ContainerLifeCycle implements WebSocketContainer, WebSocketPolicy, LifeCycle.Listener
{
public static final String JETTY_WEBSOCKET_CONTAINER_ATTRIBUTE = WebSocketContainer.class.getName();
@ -122,6 +119,7 @@ public class JettyWebSocketServerContainer extends ContainerLifeCycle implements
frameHandlerFactory = factory;
addSessionListener(sessionTracker);
addBean(sessionTracker);
}
public void addMapping(String pathSpec, JettyWebSocketCreator creator)
@ -282,23 +280,4 @@ public class JettyWebSocketServerContainer extends ContainerLifeCycle implements
{
customizer.setAutoFragment(autoFragment);
}
@Override
protected void doStart() throws Exception
{
sessionTracker.start();
super.doStart();
}
@Override
public CompletableFuture<Void> shutdown()
{
return ShutdownUtil.stop(sessionTracker);
}
@Override
public boolean isShutdown()
{
return sessionTracker.isStopped();
}
}

View File

@ -1,75 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.websocket.util;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.component.LifeCycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ShutdownUtil
{
private static final Logger LOG = LoggerFactory.getLogger(ShutdownUtil.class);
/**
* Stop a {@link LifeCycle} in a new daemon thread and be notified of the result in a {@link CompletableFuture}.
* @param lifeCycle the LifeCycle to stop.
* @return the CompletableFuture to be notified when the stop either completes or fails.
*/
public static CompletableFuture<Void> stop(LifeCycle lifeCycle)
{
AtomicReference<Thread> stopThreadReference = new AtomicReference<>();
CompletableFuture<Void> shutdown = new CompletableFuture<>()
{
@Override
public boolean cancel(boolean mayInterruptIfRunning)
{
boolean canceled = super.cancel(mayInterruptIfRunning);
if (canceled && mayInterruptIfRunning)
{
Thread thread = stopThreadReference.get();
if (thread != null)
thread.interrupt();
}
return canceled;
}
};
Thread stopThread = new Thread(() ->
{
try
{
lifeCycle.stop();
shutdown.complete(null);
}
catch (Throwable t)
{
LOG.warn("Error while stopping {}", lifeCycle, t);
shutdown.completeExceptionally(t);
}
});
stopThread.setDaemon(true);
stopThreadReference.set(stopThread);
stopThread.start();
return shutdown;
}
}