Merge pull request #1524 from metamx/peonShutdown

Make CliPeon run shutdown hooks properly
This commit is contained in:
Fangjin Yang 2015-07-15 10:09:33 -07:00
commit a4cb0332fb
2 changed files with 22 additions and 8 deletions

View File

@ -157,6 +157,7 @@ public class JettyServerModule extends JerseyServletModule
final QueuedThreadPool threadPool = new QueuedThreadPool(); final QueuedThreadPool threadPool = new QueuedThreadPool();
threadPool.setMinThreads(config.getNumThreads()); threadPool.setMinThreads(config.getNumThreads());
threadPool.setMaxThreads(config.getNumThreads()); threadPool.setMaxThreads(config.getNumThreads());
threadPool.setDaemon(true);
final Server server = new Server(threadPool); final Server server = new Server(threadPool);

View File

@ -30,7 +30,6 @@ import com.metamx.common.logger.Logger;
import io.airlift.command.Arguments; import io.airlift.command.Arguments;
import io.airlift.command.Command; import io.airlift.command.Command;
import io.airlift.command.Option; import io.airlift.command.Option;
import io.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import io.druid.guice.Binders; import io.druid.guice.Binders;
import io.druid.guice.IndexingServiceFirehoseModule; import io.druid.guice.IndexingServiceFirehoseModule;
import io.druid.guice.Jerseys; import io.druid.guice.Jerseys;
@ -56,6 +55,7 @@ import io.druid.indexing.overlord.TaskStorage;
import io.druid.indexing.overlord.ThreadPoolTaskRunner; import io.druid.indexing.overlord.ThreadPoolTaskRunner;
import io.druid.indexing.worker.executor.ExecutorLifecycle; import io.druid.indexing.worker.executor.ExecutorLifecycle;
import io.druid.indexing.worker.executor.ExecutorLifecycleConfig; import io.druid.indexing.worker.executor.ExecutorLifecycleConfig;
import io.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import io.druid.query.QuerySegmentWalker; import io.druid.query.QuerySegmentWalker;
import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentKiller;
@ -71,7 +71,6 @@ import io.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider; import io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider;
import io.druid.server.QueryResource; import io.druid.server.QueryResource;
import io.druid.server.initialization.jetty.JettyServerInitializer; import io.druid.server.initialization.jetty.JettyServerInitializer;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import java.io.File; import java.io.File;
@ -125,7 +124,8 @@ public class CliPeon extends GuiceRunnable
.to(ServiceAnnouncingChatHandlerProvider.class).in(LazySingleton.class); .to(ServiceAnnouncingChatHandlerProvider.class).in(LazySingleton.class);
handlerProviderBinder.addBinding("noop") handlerProviderBinder.addBinding("noop")
.to(NoopChatHandlerProvider.class).in(LazySingleton.class); .to(NoopChatHandlerProvider.class).in(LazySingleton.class);
binder.bind(ServiceAnnouncingChatHandlerProvider.class).in(LazySingleton.class);; binder.bind(ServiceAnnouncingChatHandlerProvider.class).in(LazySingleton.class);
binder.bind(NoopChatHandlerProvider.class).in(LazySingleton.class); binder.bind(NoopChatHandlerProvider.class).in(LazySingleton.class);
binder.bind(TaskToolboxFactory.class).in(LazySingleton.class); binder.bind(TaskToolboxFactory.class).in(LazySingleton.class);
@ -190,7 +190,9 @@ public class CliPeon extends GuiceRunnable
JsonConfigProvider.bind(binder, "druid.indexer.storage", TaskStorageConfig.class); JsonConfigProvider.bind(binder, "druid.indexer.storage", TaskStorageConfig.class);
binder.bind(TaskStorage.class).to(HeapMemoryTaskStorage.class).in(LazySingleton.class); binder.bind(TaskStorage.class).to(HeapMemoryTaskStorage.class).in(LazySingleton.class);
binder.bind(TaskActionToolbox.class).in(LazySingleton.class); binder.bind(TaskActionToolbox.class).in(LazySingleton.class);
binder.bind(IndexerMetadataStorageCoordinator.class).to(IndexerSQLMetadataStorageCoordinator.class).in(LazySingleton.class); binder.bind(IndexerMetadataStorageCoordinator.class).to(IndexerSQLMetadataStorageCoordinator.class).in(
LazySingleton.class
);
taskActionBinder.addBinding("remote") taskActionBinder.addBinding("remote")
.to(RemoteTaskActionClientFactory.class).in(LazySingleton.class); .to(RemoteTaskActionClientFactory.class).in(LazySingleton.class);
@ -205,17 +207,28 @@ public class CliPeon extends GuiceRunnable
{ {
try { try {
Injector injector = makeInjector(); Injector injector = makeInjector();
try { try {
Lifecycle lifecycle = initLifecycle(injector); final Lifecycle lifecycle = initLifecycle(injector);
Runtime.getRuntime().addShutdownHook(
new Thread(
new Runnable()
{
@Override
public void run()
{
log.info("Running shutdown hook");
lifecycle.stop();
}
}
)
);
injector.getInstance(ExecutorLifecycle.class).join(); injector.getInstance(ExecutorLifecycle.class).join();
lifecycle.stop();
} }
catch (Throwable t) { catch (Throwable t) {
log.error(t, "Error when starting up. Failing."); log.error(t, "Error when starting up. Failing.");
System.exit(1); System.exit(1);
} }
log.info("Finished peon task");
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);