diff --git a/common/src/main/java/io/druid/common/config/Log4jShutdown.java b/common/src/main/java/io/druid/common/config/Log4jShutdown.java index aafc8249f8b..3aa41e372ac 100644 --- a/common/src/main/java/io/druid/common/config/Log4jShutdown.java +++ b/common/src/main/java/io/druid/common/config/Log4jShutdown.java @@ -19,19 +19,21 @@ package io.druid.common.config; +import com.google.common.base.Throwables; import org.apache.logging.log4j.core.util.Cancellable; import org.apache.logging.log4j.core.util.ShutdownCallbackRegistry; +import javax.annotation.concurrent.GuardedBy; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; public class Log4jShutdown implements ShutdownCallbackRegistry, org.apache.logging.log4j.core.LifeCycle { - private final AtomicReference state = new AtomicReference<>(State.INITIALIZED); + private static final long SHUTDOWN_WAIT_TIMEOUT = 60000; + + private final SynchronizedStateHolder state = new SynchronizedStateHolder(State.INITIALIZED); private final Queue shutdownCallbacks = new ConcurrentLinkedQueue<>(); - private final AtomicBoolean callbacksRun = new AtomicBoolean(false); @Override public Cancellable addShutdownCallback(final Runnable callback) @@ -95,21 +97,23 @@ public class Log4jShutdown implements ShutdownCallbackRegistry, org.apache.loggi @Override public void stop() { - if (callbacksRun.get()) { + if (!state.compareAndSet(State.STARTED, State.STOPPING)) { + State current = state.waitForTransition(State.STOPPING, State.STOPPED, SHUTDOWN_WAIT_TIMEOUT); + if (current != State.STOPPED) { + throw new IllegalStateException(String.format("Expected state [%s] found [%s]", State.STARTED, current)); + } return; } - if (!state.compareAndSet(State.STARTED, State.STOPPED)) { - throw new IllegalStateException(String.format("Expected state [%s] found [%s]", State.STARTED, state.get())); + try { + runCallbacks(); + } + finally { + state.compareAndSet(State.STOPPING, State.STOPPED); } } - public void runCallbacks() + private void runCallbacks() { - if (!callbacksRun.compareAndSet(false, true)) { - // Already run, skip - return; - } - stop(); RuntimeException e = null; for (Runnable callback = shutdownCallbacks.poll(); callback != null; callback = shutdownCallbacks.poll()) { try { @@ -138,4 +142,60 @@ public class Log4jShutdown implements ShutdownCallbackRegistry, org.apache.loggi { return State.STOPPED.equals(getState()); } + + private static class SynchronizedStateHolder + { + @GuardedBy("this") + private State current; + + private SynchronizedStateHolder(State initial) { + current = initial; + } + + private synchronized boolean compareAndSet(State expected, State transition) + { + if (current == expected) { + return transition(transition); + } + return false; + } + + /** + * if current state is `expected`, wait it to be changed into `transition` state for `timeout` msec. + * if it's not, return current state immediately. + * + * @return current state + */ + private synchronized State waitForTransition(State expected, State transition, long timeout) + { + if (current == expected) { + long remaining = timeout; + try { + long prev = System.currentTimeMillis(); + while (current != transition && remaining > 0) { + wait(remaining); + long now = System.currentTimeMillis(); + remaining -= now - prev; + prev = now; + } + } + catch (InterruptedException e) { + throw Throwables.propagate(e); + } + } + return current; + } + + private synchronized boolean transition(State transition) + { + current = transition; + notifyAll(); + return true; + } + + private synchronized State get() + { + return current; + } + } } diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index b64de762c75..ebe67540910 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -233,19 +233,18 @@ public class CliPeon extends GuiceRunnable Injector injector = makeInjector(); try { final Lifecycle lifecycle = initLifecycle(injector); - Runtime.getRuntime().addShutdownHook( - new Thread( - new Runnable() - { - @Override - public void run() - { - log.info("Running shutdown hook"); - lifecycle.stop(); - } - } - ) + final Thread hook = new Thread( + new Runnable() + { + @Override + public void run() + { + log.info("Running shutdown hook"); + lifecycle.stop(); + } + } ); + Runtime.getRuntime().addShutdownHook(hook); injector.getInstance(ExecutorLifecycle.class).join(); // Sanity check to help debug unexpected non-daemon threads @@ -258,6 +257,7 @@ public class CliPeon extends GuiceRunnable // Explicitly call lifecycle stop, dont rely on shutdown hook. lifecycle.stop(); + Runtime.getRuntime().removeShutdownHook(hook); } catch (Throwable t) { log.error(t, "Error when starting up. Failing.");