Merge pull request #2133 from navis/log4j-shudown-callback

callbacks registered to Log4jShutdown is not executed when stop is called
This commit is contained in:
Navis Ryu 2016-01-15 21:28:46 +09:00
commit e4d15f3729
2 changed files with 84 additions and 24 deletions

View File

@ -19,19 +19,21 @@
package io.druid.common.config; package io.druid.common.config;
import com.google.common.base.Throwables;
import org.apache.logging.log4j.core.util.Cancellable; import org.apache.logging.log4j.core.util.Cancellable;
import org.apache.logging.log4j.core.util.ShutdownCallbackRegistry; import org.apache.logging.log4j.core.util.ShutdownCallbackRegistry;
import javax.annotation.concurrent.GuardedBy;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
public class Log4jShutdown implements ShutdownCallbackRegistry, org.apache.logging.log4j.core.LifeCycle public class Log4jShutdown implements ShutdownCallbackRegistry, org.apache.logging.log4j.core.LifeCycle
{ {
private final AtomicReference<State> state = new AtomicReference<>(State.INITIALIZED); private static final long SHUTDOWN_WAIT_TIMEOUT = 60000;
private final SynchronizedStateHolder state = new SynchronizedStateHolder(State.INITIALIZED);
private final Queue<Runnable> shutdownCallbacks = new ConcurrentLinkedQueue<>(); private final Queue<Runnable> shutdownCallbacks = new ConcurrentLinkedQueue<>();
private final AtomicBoolean callbacksRun = new AtomicBoolean(false);
@Override @Override
public Cancellable addShutdownCallback(final Runnable callback) public Cancellable addShutdownCallback(final Runnable callback)
@ -95,21 +97,23 @@ public class Log4jShutdown implements ShutdownCallbackRegistry, org.apache.loggi
@Override @Override
public void stop() public void stop()
{ {
if (callbacksRun.get()) { if (!state.compareAndSet(State.STARTED, State.STOPPING)) {
State current = state.waitForTransition(State.STOPPING, State.STOPPED, SHUTDOWN_WAIT_TIMEOUT);
if (current != State.STOPPED) {
throw new IllegalStateException(String.format("Expected state [%s] found [%s]", State.STARTED, current));
}
return; return;
} }
if (!state.compareAndSet(State.STARTED, State.STOPPED)) { try {
throw new IllegalStateException(String.format("Expected state [%s] found [%s]", State.STARTED, state.get())); runCallbacks();
}
finally {
state.compareAndSet(State.STOPPING, State.STOPPED);
} }
} }
public void runCallbacks() private void runCallbacks()
{ {
if (!callbacksRun.compareAndSet(false, true)) {
// Already run, skip
return;
}
stop();
RuntimeException e = null; RuntimeException e = null;
for (Runnable callback = shutdownCallbacks.poll(); callback != null; callback = shutdownCallbacks.poll()) { for (Runnable callback = shutdownCallbacks.poll(); callback != null; callback = shutdownCallbacks.poll()) {
try { try {
@ -138,4 +142,60 @@ public class Log4jShutdown implements ShutdownCallbackRegistry, org.apache.loggi
{ {
return State.STOPPED.equals(getState()); return State.STOPPED.equals(getState());
} }
private static class SynchronizedStateHolder
{
@GuardedBy("this")
private State current;
private SynchronizedStateHolder(State initial) {
current = initial;
}
private synchronized boolean compareAndSet(State expected, State transition)
{
if (current == expected) {
return transition(transition);
}
return false;
}
/**
* if current state is `expected`, wait it to be changed into `transition` state for `timeout` msec.
* if it's not, return current state immediately.
*
* @return current state
*/
private synchronized State waitForTransition(State expected, State transition, long timeout)
{
if (current == expected) {
long remaining = timeout;
try {
long prev = System.currentTimeMillis();
while (current != transition && remaining > 0) {
wait(remaining);
long now = System.currentTimeMillis();
remaining -= now - prev;
prev = now;
}
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
return current;
}
private synchronized boolean transition(State transition)
{
current = transition;
notifyAll();
return true;
}
private synchronized State get()
{
return current;
}
}
} }

View File

@ -233,19 +233,18 @@ public class CliPeon extends GuiceRunnable
Injector injector = makeInjector(); Injector injector = makeInjector();
try { try {
final Lifecycle lifecycle = initLifecycle(injector); final Lifecycle lifecycle = initLifecycle(injector);
Runtime.getRuntime().addShutdownHook( final Thread hook = new Thread(
new Thread( new Runnable()
new Runnable() {
{ @Override
@Override public void run()
public void run() {
{ log.info("Running shutdown hook");
log.info("Running shutdown hook"); lifecycle.stop();
lifecycle.stop(); }
} }
}
)
); );
Runtime.getRuntime().addShutdownHook(hook);
injector.getInstance(ExecutorLifecycle.class).join(); injector.getInstance(ExecutorLifecycle.class).join();
// Sanity check to help debug unexpected non-daemon threads // Sanity check to help debug unexpected non-daemon threads
@ -258,6 +257,7 @@ public class CliPeon extends GuiceRunnable
// Explicitly call lifecycle stop, dont rely on shutdown hook. // Explicitly call lifecycle stop, dont rely on shutdown hook.
lifecycle.stop(); lifecycle.stop();
Runtime.getRuntime().removeShutdownHook(hook);
} }
catch (Throwable t) { catch (Throwable t) {
log.error(t, "Error when starting up. Failing."); log.error(t, "Error when starting up. Failing.");