From 269997dc94afaf328b281d33e78a0847702e14fd Mon Sep 17 00:00:00 2001 From: cheddar Date: Mon, 26 Aug 2013 18:08:41 -0500 Subject: [PATCH] 1) ExecutorNode is working, except for the running of the task. Need to adjust it to be able to run a task and then everything will be wonderful --- .../metamx/druid/client/selector/Server.java | 1 + .../initialization/JettyServerModule.java | 2 +- .../com/metamx/druid/concurrent/Execs.java | 8 +- .../druid/guice/MiddleManagerModule.java | 1 - .../com/metamx/druid/guice/PeonModule.java | 67 ++++++++++++++ .../druid/indexing/common/RetryPolicy.java | 33 +++---- .../indexing/common/RetryPolicyConfig.java | 70 ++++++++++++++ .../indexing/common/RetryPolicyFactory.java | 3 +- .../indexing/common/TaskToolboxFactory.java | 2 + .../actions/RemoteTaskActionClient.java | 52 +++++------ .../RemoteTaskActionClientFactory.java | 13 ++- .../indexing/common/config/TaskConfig.java | 63 +++++++++---- .../EventReceivingChatHandlerProvider.java | 12 ++- .../coordinator/ThreadPoolTaskRunner.java | 9 +- .../worker/executor/ExecutorNode.java | 20 +--- .../RetryPolicyTest.java | 29 ++---- .../coordinator/RemoteTaskRunnerTest.java | 29 +----- .../coordinator/TaskLifecycleTest.java | 27 +----- .../indexing/IndexingServiceSelector.java | 6 ++ .../IndexingServiceSelectorConfig.java | 26 ++---- .../metamx/druid/guice/CoordinatorModule.java | 43 --------- .../guice/IndexingServiceDiscoveryModule.java | 83 +++++++++++++++++ .../metamx/druid/guice/StorageNodeModule.java | 5 +- .../druid/master/DruidMasterConfig.java | 6 -- .../metamx/druid/master/DruidMasterTest.java | 6 -- .../java/io/druid/cli/CliCoordinator.java | 2 + .../main/java/io/druid/cli/CliHistorical.java | 2 + .../src/main/java/io/druid/cli/CliPeon.java | 92 +++++++++++++++++++ .../main/java/io/druid/cli/CliRealtime.java | 2 + .../java/io/druid/cli/CliRealtimeExample.java | 2 + services/src/main/java/io/druid/cli/Main.java | 3 +- 31 files changed, 469 insertions(+), 250 deletions(-) create mode 100644 indexing-service/src/main/java/com/metamx/druid/guice/PeonModule.java create mode 100644 indexing-service/src/main/java/com/metamx/druid/indexing/common/RetryPolicyConfig.java rename indexing-service/src/test/java/com/metamx/druid/indexing/{coordinator => common}/RetryPolicyTest.java (59%) rename indexing-service/src/main/java/com/metamx/druid/indexing/common/config/RetryPolicyConfig.java => server/src/main/java/com/metamx/druid/client/indexing/IndexingServiceSelectorConfig.java (56%) create mode 100644 server/src/main/java/com/metamx/druid/guice/IndexingServiceDiscoveryModule.java create mode 100644 services/src/main/java/io/druid/cli/CliPeon.java diff --git a/client/src/main/java/com/metamx/druid/client/selector/Server.java b/client/src/main/java/com/metamx/druid/client/selector/Server.java index 9859c888239..a5f6b910e9a 100644 --- a/client/src/main/java/com/metamx/druid/client/selector/Server.java +++ b/client/src/main/java/com/metamx/druid/client/selector/Server.java @@ -4,6 +4,7 @@ package com.metamx.druid.client.selector; */ public interface Server { + public String getScheme(); public String getHost(); public int getPort(); } diff --git a/client/src/main/java/com/metamx/druid/initialization/JettyServerModule.java b/client/src/main/java/com/metamx/druid/initialization/JettyServerModule.java index 73e45fcfe0f..7c494754712 100644 --- a/client/src/main/java/com/metamx/druid/initialization/JettyServerModule.java +++ b/client/src/main/java/com/metamx/druid/initialization/JettyServerModule.java @@ -81,7 +81,7 @@ public class JettyServerModule extends JerseyServletModule .annotatedWith(Names.named("resourceClasses")) .toInstance(theResources); for (Class resource : theResources) { - binder.bind(resource); + binder.bind(resource).in(LazySingleton.class); } binder.bind(Key.get(Server.class, Names.named("ForTheEagerness"))).to(Server.class).asEagerSingleton(); diff --git a/common/src/main/java/com/metamx/druid/concurrent/Execs.java b/common/src/main/java/com/metamx/druid/concurrent/Execs.java index 06be9bdf4dd..38ec4db6416 100644 --- a/common/src/main/java/com/metamx/druid/concurrent/Execs.java +++ b/common/src/main/java/com/metamx/druid/concurrent/Execs.java @@ -32,9 +32,7 @@ public class Execs { public static ExecutorService singleThreaded(String nameFormat) { - return Executors.newSingleThreadExecutor( - makeThreadFactory(nameFormat) - ); + return Executors.newSingleThreadExecutor(makeThreadFactory(nameFormat)); } public static ExecutorService multiThreaded(int threads, String nameFormat) @@ -44,9 +42,7 @@ public class Execs public static ScheduledExecutorService scheduledSingleThreaded(String nameFormat) { - return Executors.newSingleThreadScheduledExecutor( - makeThreadFactory(nameFormat) - ); + return Executors.newSingleThreadScheduledExecutor(makeThreadFactory(nameFormat)); } public static ThreadFactory makeThreadFactory(String nameFormat) diff --git a/indexing-service/src/main/java/com/metamx/druid/guice/MiddleManagerModule.java b/indexing-service/src/main/java/com/metamx/druid/guice/MiddleManagerModule.java index e8e5bf6a84c..2d38f9aa4c2 100644 --- a/indexing-service/src/main/java/com/metamx/druid/guice/MiddleManagerModule.java +++ b/indexing-service/src/main/java/com/metamx/druid/guice/MiddleManagerModule.java @@ -47,7 +47,6 @@ public class MiddleManagerModule implements Module binder.bind(WorkerTaskMonitor.class).in(ManageLifecycle.class); binder.bind(WorkerCuratorCoordinator.class).in(ManageLifecycle.class); - } @Provides @LazySingleton diff --git a/indexing-service/src/main/java/com/metamx/druid/guice/PeonModule.java b/indexing-service/src/main/java/com/metamx/druid/guice/PeonModule.java new file mode 100644 index 00000000000..1114e425997 --- /dev/null +++ b/indexing-service/src/main/java/com/metamx/druid/guice/PeonModule.java @@ -0,0 +1,67 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.guice; + +import com.google.inject.Binder; +import com.google.inject.Key; +import com.google.inject.Module; +import com.google.inject.multibindings.MapBinder; +import com.metamx.druid.indexing.common.RetryPolicyConfig; +import com.metamx.druid.indexing.common.RetryPolicyFactory; +import com.metamx.druid.indexing.common.TaskToolboxFactory; +import com.metamx.druid.indexing.common.actions.RemoteTaskActionClientFactory; +import com.metamx.druid.indexing.common.actions.TaskActionClientFactory; +import com.metamx.druid.indexing.common.config.TaskConfig; +import com.metamx.druid.indexing.common.index.ChatHandlerProvider; +import com.metamx.druid.indexing.common.index.EventReceivingChatHandlerProvider; +import com.metamx.druid.indexing.common.index.NoopChatHandlerProvider; +import com.metamx.druid.loading.DataSegmentKiller; +import com.metamx.druid.loading.S3DataSegmentKiller; + +/** + */ +public class PeonModule implements Module +{ + @Override + public void configure(Binder binder) + { + PolyBind.createChoice( + binder, + "druid.indexer.task.chathandler.type", + Key.get(ChatHandlerProvider.class), + Key.get(NoopChatHandlerProvider.class) + ); + final MapBinder handlerProviderBinder = PolyBind.optionBinder( + binder, Key.get(ChatHandlerProvider.class) + ); + handlerProviderBinder.addBinding("curator").to(EventReceivingChatHandlerProvider.class); + handlerProviderBinder.addBinding("noop").to(NoopChatHandlerProvider.class); + + binder.bind(TaskToolboxFactory.class).in(LazySingleton.class); + + JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class); + JsonConfigProvider.bind(binder, "druid.worker.taskActionClient.retry", RetryPolicyConfig.class); + + binder.bind(TaskActionClientFactory.class).to(RemoteTaskActionClientFactory.class).in(LazySingleton.class); + binder.bind(RetryPolicyFactory.class).in(LazySingleton.class); + + binder.bind(DataSegmentKiller.class).to(S3DataSegmentKiller.class).in(LazySingleton.class); + } +} diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/RetryPolicy.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/RetryPolicy.java index e1089990cd0..278af1c108a 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/RetryPolicy.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/RetryPolicy.java @@ -19,7 +19,6 @@ package com.metamx.druid.indexing.common; -import com.metamx.druid.indexing.common.config.RetryPolicyConfig; import com.metamx.emitter.EmittingLogger; import org.joda.time.Duration; @@ -29,41 +28,35 @@ public class RetryPolicy { private static final EmittingLogger log = new EmittingLogger(RetryPolicy.class); - private final long MAX_NUM_RETRIES; - private final Duration MAX_RETRY_DURATION; + private final long maxNumRetries; + private final Duration maxRetryDelay; private volatile Duration currRetryDelay; private volatile int retryCount; public RetryPolicy(RetryPolicyConfig config) { - this.MAX_NUM_RETRIES = config.getMaxRetryCount(); - this.MAX_RETRY_DURATION = config.getRetryMaxDuration(); + this.maxNumRetries = config.getMaxRetryCount(); + this.maxRetryDelay = config.getMaxWait().toStandardDuration(); - this.currRetryDelay = config.getRetryMinDuration(); + this.currRetryDelay = config.getMinWait().toStandardDuration(); this.retryCount = 0; } - public Duration getRetryDelay() - { - return currRetryDelay; - } - public Duration getAndIncrementRetryDelay() { - Duration retVal = new Duration(currRetryDelay); - currRetryDelay = new Duration(Math.min(currRetryDelay.getMillis() * 2, MAX_RETRY_DURATION.getMillis())); - retryCount++; - return retVal; - } + if (hasExceededRetryThreshold()) { + return null; + } - public int getNumRetries() - { - return retryCount; + Duration retVal = currRetryDelay; + currRetryDelay = new Duration(Math.min(currRetryDelay.getMillis() * 2, maxRetryDelay.getMillis())); + ++retryCount; + return retVal; } public boolean hasExceededRetryThreshold() { - return retryCount >= MAX_NUM_RETRIES; + return retryCount >= maxNumRetries; } } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/RetryPolicyConfig.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/RetryPolicyConfig.java new file mode 100644 index 00000000000..de643db2353 --- /dev/null +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/RetryPolicyConfig.java @@ -0,0 +1,70 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.indexing.common; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.joda.time.Period; + +/** + */ +public class RetryPolicyConfig +{ + @JsonProperty + private Period minWait = new Period("PT1M"); + + @JsonProperty + private Period maxWait = new Period("PT10M"); + + @JsonProperty + private long maxRetryCount = 10; + + public Period getMinWait() + { + return minWait; + } + + RetryPolicyConfig setMinWait(Period minWait) + { + this.minWait = minWait; + return this; + } + + public Period getMaxWait() + { + return maxWait; + } + + RetryPolicyConfig setMaxWait(Period maxWait) + { + this.maxWait = maxWait; + return this; + } + + public long getMaxRetryCount() + { + return maxRetryCount; + } + + RetryPolicyConfig setMaxRetryCount(long maxRetryCount) + { + this.maxRetryCount = maxRetryCount; + return this; + } +} diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/RetryPolicyFactory.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/RetryPolicyFactory.java index f9dabd54b52..74731e21ff6 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/RetryPolicyFactory.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/RetryPolicyFactory.java @@ -19,7 +19,7 @@ package com.metamx.druid.indexing.common; -import com.metamx.druid.indexing.common.config.RetryPolicyConfig; +import com.google.inject.Inject; /** */ @@ -27,6 +27,7 @@ public class RetryPolicyFactory { private final RetryPolicyConfig config; + @Inject public RetryPolicyFactory(RetryPolicyConfig config) { this.config = config; diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolboxFactory.java index 5acd075bee7..09c02ddf0b1 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolboxFactory.java @@ -20,6 +20,7 @@ package com.metamx.druid.indexing.common; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; import com.metamx.druid.client.ServerView; import com.metamx.druid.coordination.DataSegmentAnnouncer; import com.metamx.druid.indexing.common.actions.TaskActionClientFactory; @@ -49,6 +50,7 @@ public class TaskToolboxFactory private final MonitorScheduler monitorScheduler; private final ObjectMapper objectMapper; + @Inject public TaskToolboxFactory( TaskConfig config, TaskActionClientFactory taskActionClientFactory, diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/RemoteTaskActionClient.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/RemoteTaskActionClient.java index 289fcf005d9..531f8653053 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/RemoteTaskActionClient.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/RemoteTaskActionClient.java @@ -6,13 +6,13 @@ import com.google.common.base.Charsets; import com.google.common.base.Throwables; import com.metamx.common.ISE; import com.metamx.common.logger.Logger; +import com.metamx.druid.client.indexing.IndexingServiceSelector; +import com.metamx.druid.client.selector.Server; import com.metamx.druid.indexing.common.RetryPolicy; import com.metamx.druid.indexing.common.RetryPolicyFactory; import com.metamx.druid.indexing.common.task.Task; import com.metamx.http.client.HttpClient; import com.metamx.http.client.response.ToStringResponseHandler; -import org.apache.curator.x.discovery.ServiceInstance; -import org.apache.curator.x.discovery.ServiceProvider; import org.joda.time.Duration; import java.io.IOException; @@ -23,7 +23,7 @@ public class RemoteTaskActionClient implements TaskActionClient { private final Task task; private final HttpClient httpClient; - private final ServiceProvider serviceProvider; + private final IndexingServiceSelector serviceProvider; private final RetryPolicyFactory retryPolicyFactory; private final ObjectMapper jsonMapper; @@ -32,7 +32,7 @@ public class RemoteTaskActionClient implements TaskActionClient public RemoteTaskActionClient( Task task, HttpClient httpClient, - ServiceProvider serviceProvider, + IndexingServiceSelector serviceProvider, RetryPolicyFactory retryPolicyFactory, ObjectMapper jsonMapper ) @@ -79,20 +79,23 @@ public class RemoteTaskActionClient implements TaskActionClient } final Map responseDict = jsonMapper.readValue( - response, - new TypeReference>() {} + response, new TypeReference>() + { + } ); return jsonMapper.convertValue(responseDict.get("result"), taskAction.getReturnTypeReference()); - } catch(IOException e) { - log.warn(e, "Exception submitting action for task: %s", task.getId()); + } + catch (IOException e) { + log.warn(e, "Exception submitting action for task[%s]", task.getId()); - if (retryPolicy.hasExceededRetryThreshold()) { + final Duration delay = retryPolicy.getAndIncrementRetryDelay(); + if (delay == null) { throw e; } else { try { - final long sleepTime = retryPolicy.getAndIncrementRetryDelay().getMillis(); - log.info("Will try again in %s.", new Duration(sleepTime).toString()); + final long sleepTime = delay.getMillis(); + log.info("Will try again in [%s].", new Duration(sleepTime).toString()); Thread.sleep(sleepTime); } catch (InterruptedException e2) { @@ -105,26 +108,19 @@ public class RemoteTaskActionClient implements TaskActionClient private URI getServiceUri() throws Exception { - final ServiceInstance instance = serviceProvider.getInstance(); - final String scheme; - final String host; - final int port; - final String path = "/druid/indexer/v1/action"; - + final Server instance = serviceProvider.pick(); if (instance == null) { throw new ISE("Cannot find instance of indexer to talk to!"); } - host = instance.getAddress(); - - if (instance.getSslPort() != null && instance.getSslPort() > 0) { - scheme = "https"; - port = instance.getSslPort(); - } else { - scheme = "http"; - port = instance.getPort(); - } - - return new URI(scheme, null, host, port, path, null, null); + return new URI( + instance.getScheme(), + null, + instance.getHost(), + instance.getPort(), + "/druid/indexer/v1/action", + null, + null + ); } } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/RemoteTaskActionClientFactory.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/RemoteTaskActionClientFactory.java index c872a2200a4..450ecdaee19 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/RemoteTaskActionClientFactory.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/RemoteTaskActionClientFactory.java @@ -20,23 +20,26 @@ package com.metamx.druid.indexing.common.actions; import com.fasterxml.jackson.databind.ObjectMapper; -import com.metamx.druid.indexing.common.task.Task; +import com.google.inject.Inject; +import com.metamx.druid.client.indexing.IndexingServiceSelector; +import com.metamx.druid.guice.annotations.Global; import com.metamx.druid.indexing.common.RetryPolicyFactory; +import com.metamx.druid.indexing.common.task.Task; import com.metamx.http.client.HttpClient; -import org.apache.curator.x.discovery.ServiceProvider; /** */ public class RemoteTaskActionClientFactory implements TaskActionClientFactory { private final HttpClient httpClient; - private final ServiceProvider serviceProvider; + private final IndexingServiceSelector serviceProvider; private final RetryPolicyFactory retryPolicyFactory; private final ObjectMapper jsonMapper; + @Inject public RemoteTaskActionClientFactory( - HttpClient httpClient, - ServiceProvider serviceProvider, + @Global HttpClient httpClient, + IndexingServiceSelector serviceProvider, RetryPolicyFactory retryPolicyFactory, ObjectMapper jsonMapper ) diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/config/TaskConfig.java index 5e9789e9660..79f43450677 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/config/TaskConfig.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/config/TaskConfig.java @@ -19,38 +19,65 @@ package com.metamx.druid.indexing.common.config; -import com.google.common.base.Joiner; -import org.skife.config.Config; -import org.skife.config.Default; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; import java.io.File; -public abstract class TaskConfig +public class TaskConfig { - private static Joiner joiner = Joiner.on("/"); + @JsonProperty + private final String baseDir; - @Config("druid.indexer.baseDir") - @Default("/tmp/") - public abstract String getBaseDir(); + @JsonProperty + private final File baseTaskDir; + + @JsonProperty + private final String hadoopWorkingPath; + + @JsonProperty + private final int defaultRowFlushBoundary; + + @JsonCreator + public TaskConfig( + @JsonProperty("baseDir") String baseDir, + @JsonProperty("baseTaskDir") String baseTaskDir, + @JsonProperty("hadoopWorkingPath") String hadoopWorkingPath, + @JsonProperty("defaultRowFlushBoundary") Integer defaultRowFlushBoundary + ) + { + this.baseDir = baseDir == null ? "/tmp" : baseDir; + this.baseTaskDir = new File(defaultDir(baseTaskDir, "persistent/task")); + this.hadoopWorkingPath = defaultDir(hadoopWorkingPath, "druid-indexing"); + this.defaultRowFlushBoundary = defaultRowFlushBoundary == null ? 500000 : defaultRowFlushBoundary; + } + + public String getBaseDir() + { + return baseDir; + } - @Config("druid.indexer.taskDir") public File getBaseTaskDir() { - return new File(defaultPath("persistent/task")); + return baseTaskDir; } - @Config("druid.indexer.hadoopWorkingPath") public String getHadoopWorkingPath() { - return defaultPath("druid-indexing"); + return hadoopWorkingPath; } - @Config("druid.indexer.rowFlushBoundary") - @Default("500000") - public abstract int getDefaultRowFlushBoundary(); - - private String defaultPath(String subPath) + public int getDefaultRowFlushBoundary() { - return joiner.join(getBaseDir(), subPath); + return defaultRowFlushBoundary; + } + + private String defaultDir(String configParameter, final String defaultVal) + { + if (configParameter == null) { + return String.format("%s/%s", getBaseDir(), defaultVal); + } + + return configParameter; } } \ No newline at end of file diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/index/EventReceivingChatHandlerProvider.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/index/EventReceivingChatHandlerProvider.java index c4ae8cec851..6d0ea613170 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/index/EventReceivingChatHandlerProvider.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/index/EventReceivingChatHandlerProvider.java @@ -21,10 +21,11 @@ package com.metamx.druid.indexing.common.index; import com.google.common.base.Optional; import com.google.common.collect.Maps; +import com.google.inject.Inject; import com.metamx.common.ISE; import com.metamx.common.logger.Logger; import com.metamx.druid.curator.discovery.ServiceAnnouncer; -import com.metamx.druid.indexing.worker.config.ChatHandlerProviderConfig; +import com.metamx.druid.guice.annotations.Self; import com.metamx.druid.initialization.DruidNode; import java.util.concurrent.ConcurrentMap; @@ -38,16 +39,17 @@ public class EventReceivingChatHandlerProvider implements ChatHandlerProvider { private static final Logger log = new Logger(EventReceivingChatHandlerProvider.class); - private final ChatHandlerProviderConfig config; + private final DruidNode node; private final ServiceAnnouncer serviceAnnouncer; private final ConcurrentMap handlers; + @Inject public EventReceivingChatHandlerProvider( - ChatHandlerProviderConfig config, + @Self DruidNode node, ServiceAnnouncer serviceAnnouncer ) { - this.config = config; + this.node = node; this.serviceAnnouncer = serviceAnnouncer; this.handlers = Maps.newConcurrentMap(); } @@ -100,6 +102,6 @@ public class EventReceivingChatHandlerProvider implements ChatHandlerProvider private DruidNode makeDruidNode(String key) { - return new DruidNode(key, config.getHost(), config.getPort()); + return new DruidNode(key, node.getHost(), node.getPort()); } } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ThreadPoolTaskRunner.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ThreadPoolTaskRunner.java index 202ca0f55f5..dc6601ba1a7 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ThreadPoolTaskRunner.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ThreadPoolTaskRunner.java @@ -28,8 +28,10 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import com.google.inject.Inject; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.Query; +import com.metamx.druid.concurrent.Execs; import com.metamx.druid.indexing.common.TaskStatus; import com.metamx.druid.indexing.common.TaskToolbox; import com.metamx.druid.indexing.common.TaskToolboxFactory; @@ -48,7 +50,6 @@ import java.util.List; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.ExecutorService; /** * Runs tasks in a JVM thread using an ExecutorService. @@ -61,13 +62,13 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker private static final EmittingLogger log = new EmittingLogger(ThreadPoolTaskRunner.class); + @Inject public ThreadPoolTaskRunner( - TaskToolboxFactory toolboxFactory, - ExecutorService exec + TaskToolboxFactory toolboxFactory ) { this.toolboxFactory = toolboxFactory; - this.exec = MoreExecutors.listeningDecorator(exec); + this.exec = MoreExecutors.listeningDecorator(Execs.singleThreaded("task-runner-%d")); } @LifecycleStop diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java index 53baaec3574..ad8a6a26168 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.servlet.GuiceFilter; @@ -36,16 +35,17 @@ import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.BaseServerNode; +import com.metamx.druid.client.indexing.IndexingServiceSelector; import com.metamx.druid.curator.CuratorConfig; import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer; import com.metamx.druid.curator.discovery.ServiceAnnouncer; import com.metamx.druid.http.GuiceServletConfig; import com.metamx.druid.http.QueryServlet; import com.metamx.druid.http.StatusServlet; +import com.metamx.druid.indexing.common.RetryPolicyConfig; import com.metamx.druid.indexing.common.RetryPolicyFactory; import com.metamx.druid.indexing.common.TaskToolboxFactory; import com.metamx.druid.indexing.common.actions.RemoteTaskActionClientFactory; -import com.metamx.druid.indexing.common.config.RetryPolicyConfig; import com.metamx.druid.indexing.common.config.TaskConfig; import com.metamx.druid.indexing.common.index.ChatHandlerProvider; import com.metamx.druid.indexing.common.index.EventReceiverFirehoseFactory; @@ -86,7 +86,6 @@ import org.jets3t.service.security.AWSCredentials; import org.skife.config.ConfigurationObjectFactory; import java.util.Properties; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; /** @@ -322,7 +321,7 @@ public class ExecutorNode extends BaseServerNode taskConfig, new RemoteTaskActionClientFactory( httpClient, - coordinatorServiceProvider, + new IndexingServiceSelector(coordinatorServiceProvider), new RetryPolicyFactory( configFactory.buildWithReplacements( RetryPolicyConfig.class, @@ -371,16 +370,7 @@ public class ExecutorNode extends BaseServerNode public void initializeTaskRunner() { if (taskRunner == null) { - this.taskRunner = lifecycle.addManagedInstance( - new ThreadPoolTaskRunner( - taskToolboxFactory, - Executors.newSingleThreadExecutor( - new ThreadFactoryBuilder() - .setNameFormat("task-runner-%d") - .build() - ) - ) - ); + this.taskRunner = lifecycle.addManagedInstance(new ThreadPoolTaskRunner(taskToolboxFactory)); } } @@ -389,7 +379,7 @@ public class ExecutorNode extends BaseServerNode if (chatHandlerProvider == null) { final ChatHandlerProviderConfig config = configFactory.build(ChatHandlerProviderConfig.class); if (config.isPublishDiscovery()) { - this.chatHandlerProvider = new EventReceivingChatHandlerProvider(config, serviceAnnouncer); + this.chatHandlerProvider = new EventReceivingChatHandlerProvider(null, serviceAnnouncer); // TODO: eliminate } else { log.info("ChatHandlerProvider: Using NoopServiceAnnouncer. Good luck finding your firehoses!"); this.chatHandlerProvider = new NoopChatHandlerProvider(); diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RetryPolicyTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/common/RetryPolicyTest.java similarity index 59% rename from indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RetryPolicyTest.java rename to indexing-service/src/test/java/com/metamx/druid/indexing/common/RetryPolicyTest.java index cfa08d44401..07aaa66e0bd 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RetryPolicyTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/common/RetryPolicyTest.java @@ -1,9 +1,8 @@ -package com.metamx.druid.indexing.coordinator; +package com.metamx.druid.indexing.common; -import com.metamx.druid.indexing.common.RetryPolicy; -import com.metamx.druid.indexing.common.config.RetryPolicyConfig; import junit.framework.Assert; import org.joda.time.Duration; +import org.joda.time.Period; import org.junit.Test; /** @@ -15,25 +14,9 @@ public class RetryPolicyTest { RetryPolicy retryPolicy = new RetryPolicy( new RetryPolicyConfig() - { - @Override - public Duration getRetryMinDuration() - { - return new Duration("PT1S"); - } - - @Override - public Duration getRetryMaxDuration() - { - return new Duration("PT10S"); - } - - @Override - public long getMaxRetryCount() - { - return 10; - } - } + .setMinWait(new Period("PT1S")) + .setMaxWait(new Period("PT10S")) + .setMaxRetryCount(6) ); Assert.assertEquals(new Duration("PT1S"), retryPolicy.getAndIncrementRetryDelay()); @@ -42,5 +25,7 @@ public class RetryPolicyTest Assert.assertEquals(new Duration("PT8S"), retryPolicy.getAndIncrementRetryDelay()); Assert.assertEquals(new Duration("PT10S"), retryPolicy.getAndIncrementRetryDelay()); Assert.assertEquals(new Duration("PT10S"), retryPolicy.getAndIncrementRetryDelay()); + Assert.assertEquals(null, retryPolicy.getAndIncrementRetryDelay()); + Assert.assertTrue(retryPolicy.hasExceededRetryThreshold()); } } diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java index c985f6e4aba..130cfaac383 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java @@ -49,7 +49,6 @@ import org.junit.Test; import java.io.File; import java.util.Arrays; import java.util.Set; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -306,6 +305,8 @@ public class RemoteTaskRunnerTest ); workerCuratorCoordinator.start(); + final File tmp = Files.createTempDir(); + // Start a task monitor workerTaskMonitor = new WorkerTaskMonitor( jsonMapper, @@ -313,29 +314,9 @@ public class RemoteTaskRunnerTest workerCuratorCoordinator, new ThreadPoolTaskRunner( new TaskToolboxFactory( - new TaskConfig() - { - @Override - public String getBaseDir() - { - File tmp = Files.createTempDir(); - tmp.deleteOnExit(); - return tmp.toString(); - } - - @Override - public int getDefaultRowFlushBoundary() - { - return 0; - } - - @Override - public String getHadoopWorkingPath() - { - return null; - } - }, null, null, null, null, null, null, null, null, null, jsonMapper - ), Executors.newSingleThreadExecutor() + new TaskConfig(tmp.toString(), null, null, 0), + null, null, null, null, null, null, null, null, null, jsonMapper + ) ), new WorkerConfig().setCapacity(1) ); diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskLifecycleTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskLifecycleTest.java index a67ec0326f6..2ec5bd543f3 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskLifecycleTest.java @@ -78,7 +78,6 @@ import java.io.IOException; import java.util.Iterator; import java.util.List; import java.util.Set; -import java.util.concurrent.Executors; public class TaskLifecycleTest { @@ -116,26 +115,7 @@ public class TaskLifecycleTest tac = new LocalTaskActionClientFactory(ts, new TaskActionToolbox(tq, tl, mdc, newMockEmitter())); tb = new TaskToolboxFactory( - new TaskConfig() - { - @Override - public String getBaseDir() - { - return tmp.toString(); - } - - @Override - public int getDefaultRowFlushBoundary() - { - return 50000; - } - - @Override - public String getHadoopWorkingPath() - { - return null; - } - }, + new TaskConfig(tmp.toString(), null, null, 50000), tac, newMockEmitter(), null, // s3 client @@ -162,10 +142,7 @@ public class TaskLifecycleTest new DefaultObjectMapper() ); - tr = new ThreadPoolTaskRunner( - tb, - Executors.newSingleThreadExecutor() - ); + tr = new ThreadPoolTaskRunner(tb); tc = new TaskConsumer(tq, tr, tac, newMockEmitter()); tsqa = new TaskStorageQueryAdapter(ts); diff --git a/server/src/main/java/com/metamx/druid/client/indexing/IndexingServiceSelector.java b/server/src/main/java/com/metamx/druid/client/indexing/IndexingServiceSelector.java index 41259c1b28d..6f686832cb6 100644 --- a/server/src/main/java/com/metamx/druid/client/indexing/IndexingServiceSelector.java +++ b/server/src/main/java/com/metamx/druid/client/indexing/IndexingServiceSelector.java @@ -52,6 +52,12 @@ public class IndexingServiceSelector implements DiscoverySelector { return instance.getPort(); } + + @Override + public String getScheme() + { + return "http"; + } }; } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/config/RetryPolicyConfig.java b/server/src/main/java/com/metamx/druid/client/indexing/IndexingServiceSelectorConfig.java similarity index 56% rename from indexing-service/src/main/java/com/metamx/druid/indexing/common/config/RetryPolicyConfig.java rename to server/src/main/java/com/metamx/druid/client/indexing/IndexingServiceSelectorConfig.java index b9ad26ad2d4..b34404eebc1 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/config/RetryPolicyConfig.java +++ b/server/src/main/java/com/metamx/druid/client/indexing/IndexingServiceSelectorConfig.java @@ -1,6 +1,6 @@ /* * Druid - a distributed column store. - * Copyright (C) 2012 Metamarkets Group Inc. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License @@ -17,25 +17,19 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package com.metamx.druid.indexing.common.config; +package com.metamx.druid.client.indexing; -import org.joda.time.Duration; -import org.skife.config.Config; -import org.skife.config.Default; +import com.fasterxml.jackson.annotation.JsonProperty; /** */ -public abstract class RetryPolicyConfig +public class IndexingServiceSelectorConfig { - @Config("${base_path}.retry.minWaitMillis") - @Default("PT1M") // 1 minute - public abstract Duration getRetryMinDuration(); + @JsonProperty + private String serviceName = null; - @Config("${base_path}.retry.maxWaitMillis") - @Default("PT10M") // 10 minutes - public abstract Duration getRetryMaxDuration(); - - @Config("${base_path}.retry.maxRetryCount") - @Default("10") - public abstract long getMaxRetryCount(); + public String getServiceName() + { + return serviceName; + } } diff --git a/server/src/main/java/com/metamx/druid/guice/CoordinatorModule.java b/server/src/main/java/com/metamx/druid/guice/CoordinatorModule.java index 86d857bcb37..8ae217714ca 100644 --- a/server/src/main/java/com/metamx/druid/guice/CoordinatorModule.java +++ b/server/src/main/java/com/metamx/druid/guice/CoordinatorModule.java @@ -4,14 +4,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provides; -import com.google.inject.TypeLiteral; import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.druid.client.ServerInventoryViewConfig; -import com.metamx.druid.client.indexing.IndexingService; import com.metamx.druid.client.indexing.IndexingServiceClient; -import com.metamx.druid.client.indexing.IndexingServiceSelector; -import com.metamx.druid.client.selector.DiscoverySelector; -import com.metamx.druid.client.selector.Server; import com.metamx.druid.db.DatabaseRuleManager; import com.metamx.druid.db.DatabaseRuleManagerConfig; import com.metamx.druid.db.DatabaseRuleManagerProvider; @@ -26,11 +21,6 @@ import com.metamx.druid.master.DruidMaster; import com.metamx.druid.master.DruidMasterConfig; import com.metamx.druid.master.LoadQueueTaskMaster; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.x.discovery.ServiceDiscovery; -import org.apache.curator.x.discovery.ServiceInstance; -import org.apache.curator.x.discovery.ServiceProvider; - -import java.io.IOException; /** */ @@ -56,10 +46,6 @@ public class CoordinatorModule implements Module .toProvider(DatabaseRuleManagerProvider.class) .in(ManageLifecycle.class); - binder.bind(new TypeLiteral>(){}) - .annotatedWith(IndexingService.class) - .to(IndexingServiceSelector.class) - .in(ManageLifecycle.class); binder.bind(IndexingServiceClient.class).in(LazySingleton.class); binder.bind(RedirectInfo.class).to(MasterRedirectInfo.class).in(LazySingleton.class); @@ -67,35 +53,6 @@ public class CoordinatorModule implements Module binder.bind(DruidMaster.class); } - @Provides @LazySingleton @IndexingService - public ServiceProvider getServiceProvider(DruidMasterConfig config, ServiceDiscovery serviceDiscovery) - { - // TODO: This service discovery stuff is really really janky. It needs to be reworked. - if (config.getMergerServiceName() == null) { - return new ServiceProvider() - { - @Override - public void start() throws Exception - { - - } - - @Override - public ServiceInstance getInstance() throws Exception - { - return null; - } - - @Override - public void close() throws IOException - { - - } - }; - } - return serviceDiscovery.serviceProviderBuilder().serviceName(config.getMergerServiceName()).build(); - } - @Provides @LazySingleton public LoadQueueTaskMaster getLoadQueueTaskMaster( CuratorFramework curator, ObjectMapper jsonMapper, ScheduledExecutorFactory factory, DruidMasterConfig config diff --git a/server/src/main/java/com/metamx/druid/guice/IndexingServiceDiscoveryModule.java b/server/src/main/java/com/metamx/druid/guice/IndexingServiceDiscoveryModule.java new file mode 100644 index 00000000000..e39c43dd878 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/guice/IndexingServiceDiscoveryModule.java @@ -0,0 +1,83 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.guice; + +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.google.inject.TypeLiteral; +import com.metamx.druid.client.indexing.IndexingService; +import com.metamx.druid.client.indexing.IndexingServiceSelector; +import com.metamx.druid.client.indexing.IndexingServiceSelectorConfig; +import com.metamx.druid.client.selector.DiscoverySelector; +import com.metamx.druid.client.selector.Server; +import org.apache.curator.x.discovery.ServiceDiscovery; +import org.apache.curator.x.discovery.ServiceInstance; +import org.apache.curator.x.discovery.ServiceProvider; + +import java.io.IOException; + +/** + */ +public class IndexingServiceDiscoveryModule implements Module +{ + @Override + public void configure(Binder binder) + { + JsonConfigProvider.bind(binder, "druid.selectors.indexing", IndexingServiceSelectorConfig.class); + binder.bind(new TypeLiteral>(){}) + .annotatedWith(IndexingService.class) + .to(IndexingServiceSelector.class); + + binder.bind(IndexingServiceSelector.class).in(ManageLifecycle.class); + } + + @Provides + @LazySingleton @IndexingService + public ServiceProvider getServiceProvider( + IndexingServiceSelectorConfig config, + ServiceDiscovery serviceDiscovery + ) + { + if (config.getServiceName() == null) { + return new ServiceProvider() + { + @Override + public void start() throws Exception + { + + } + + @Override + public ServiceInstance getInstance() throws Exception + { + return null; + } + + @Override + public void close() throws IOException + { + + } + }; + } + return serviceDiscovery.serviceProviderBuilder().serviceName(config.getServiceName()).build(); + } +} diff --git a/server/src/main/java/com/metamx/druid/guice/StorageNodeModule.java b/server/src/main/java/com/metamx/druid/guice/StorageNodeModule.java index 72954cec734..33b7688afe7 100644 --- a/server/src/main/java/com/metamx/druid/guice/StorageNodeModule.java +++ b/server/src/main/java/com/metamx/druid/guice/StorageNodeModule.java @@ -20,6 +20,7 @@ package com.metamx.druid.guice; import com.google.inject.Binder; +import com.google.inject.Module; import com.google.inject.Provides; import com.metamx.druid.client.DruidServerConfig; import com.metamx.druid.coordination.DruidServerMetadata; @@ -33,7 +34,7 @@ import com.metamx.druid.query.QueryRunnerFactoryConglomerate; /** */ -public class StorageNodeModule extends ServerModule +public class StorageNodeModule implements Module { private final String nodeType; @@ -45,8 +46,6 @@ public class StorageNodeModule extends ServerModule @Override public void configure(Binder binder) { - super.configure(binder); - JsonConfigProvider.bind(binder, "druid.server", DruidServerConfig.class); JsonConfigProvider.bind(binder, "druid.segmentCache", SegmentLoaderConfig.class); diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java b/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java index d514b4d5c4f..13f63d72f70 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java @@ -60,12 +60,6 @@ public abstract class DruidMasterConfig return false; } - @Config("druid.master.merger.service") - public String getMergerServiceName() - { - return null; - } - @Config("druid.master.merge.threshold") public long getMergeBytesLimit() { diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java index a49dc85a582..5db44c86363 100644 --- a/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java @@ -100,12 +100,6 @@ public class DruidMasterTest return super.getMillisToWaitBeforeDeleting(); } - @Override - public String getMergerServiceName() - { - return ""; - } - @Override public int getMaxSegmentsToMove() { diff --git a/services/src/main/java/io/druid/cli/CliCoordinator.java b/services/src/main/java/io/druid/cli/CliCoordinator.java index 7c60be03346..f8466abf057 100644 --- a/services/src/main/java/io/druid/cli/CliCoordinator.java +++ b/services/src/main/java/io/druid/cli/CliCoordinator.java @@ -8,6 +8,7 @@ import com.metamx.druid.curator.discovery.DiscoveryModule; import com.metamx.druid.guice.CoordinatorModule; import com.metamx.druid.guice.DbConnectorModule; import com.metamx.druid.guice.HttpClientModule; +import com.metamx.druid.guice.IndexingServiceDiscoveryModule; import com.metamx.druid.guice.JacksonConfigManagerModule; import com.metamx.druid.guice.LifecycleModule; import com.metamx.druid.guice.ServerModule; @@ -68,6 +69,7 @@ public class CliCoordinator extends ServerRunnable .addResource(MasterResource.class) .addResource(StatusResource.class), new ServerViewModule(), + new IndexingServiceDiscoveryModule(), CoordinatorModule.class ); } diff --git a/services/src/main/java/io/druid/cli/CliHistorical.java b/services/src/main/java/io/druid/cli/CliHistorical.java index 3f30002c577..2f8d521d5be 100644 --- a/services/src/main/java/io/druid/cli/CliHistorical.java +++ b/services/src/main/java/io/druid/cli/CliHistorical.java @@ -33,6 +33,7 @@ import com.metamx.druid.guice.HttpClientModule; import com.metamx.druid.guice.LifecycleModule; import com.metamx.druid.guice.QueryRunnerFactoryModule; import com.metamx.druid.guice.QueryableModule; +import com.metamx.druid.guice.ServerModule; import com.metamx.druid.guice.StorageNodeModule; import com.metamx.druid.http.StatusResource; import com.metamx.druid.initialization.EmitterModule; @@ -70,6 +71,7 @@ public class CliHistorical extends ServerRunnable AWSModule.class, DataSegmentPullerModule.class, new MetricsModule().register(ServerMonitor.class), + new ServerModule(), new StorageNodeModule("historical"), new JettyServerModule(new QueryJettyServerInitializer()) .addResource(StatusResource.class), diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java new file mode 100644 index 00000000000..fb3a43aa73d --- /dev/null +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -0,0 +1,92 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.cli; + +import com.google.inject.Injector; +import com.metamx.common.logger.Logger; +import com.metamx.druid.curator.CuratorModule; +import com.metamx.druid.curator.discovery.DiscoveryModule; +import com.metamx.druid.guice.AWSModule; +import com.metamx.druid.guice.AnnouncerModule; +import com.metamx.druid.guice.DataSegmentPusherModule; +import com.metamx.druid.guice.DruidProcessingModule; +import com.metamx.druid.guice.HttpClientModule; +import com.metamx.druid.guice.IndexingServiceDiscoveryModule; +import com.metamx.druid.guice.LifecycleModule; +import com.metamx.druid.guice.PeonModule; +import com.metamx.druid.guice.QueryRunnerFactoryModule; +import com.metamx.druid.guice.QueryableModule; +import com.metamx.druid.guice.ServerModule; +import com.metamx.druid.guice.ServerViewModule; +import com.metamx.druid.guice.StorageNodeModule; +import com.metamx.druid.http.StatusResource; +import com.metamx.druid.indexing.coordinator.ThreadPoolTaskRunner; +import com.metamx.druid.indexing.worker.executor.ChatHandlerResource; +import com.metamx.druid.initialization.EmitterModule; +import com.metamx.druid.initialization.Initialization; +import com.metamx.druid.initialization.JettyServerModule; +import com.metamx.druid.metrics.MetricsModule; +import io.airlift.command.Command; + +/** + */ +@Command( + name = "peon", + description = "Runs a Peon, this is an individual forked \"task\" used as part of the indexing service. " + + "This should rarely, if ever, be used directly." +) +public class CliPeon extends ServerRunnable +{ + private static final Logger log = new Logger(CliPeon.class); + + public CliPeon() + { + super(log); + } + + @Override + protected Injector getInjector() + { + // TODO: make it take and run a task + + return Initialization.makeInjector( + new LifecycleModule(), + EmitterModule.class, + HttpClientModule.global(), + CuratorModule.class, + new MetricsModule(), + new ServerModule(), + new JettyServerModule(new QueryJettyServerInitializer()) + .addResource(StatusResource.class) + .addResource(ChatHandlerResource.class), + new DiscoveryModule(), + new ServerViewModule(), + new StorageNodeModule("real-time"), + new DataSegmentPusherModule(), + new AnnouncerModule(), + new DruidProcessingModule(), + new QueryableModule(ThreadPoolTaskRunner.class), + new QueryRunnerFactoryModule(), + new IndexingServiceDiscoveryModule(), + new AWSModule(), + new PeonModule() + ); + } +} diff --git a/services/src/main/java/io/druid/cli/CliRealtime.java b/services/src/main/java/io/druid/cli/CliRealtime.java index 165996a4497..080397a1a76 100644 --- a/services/src/main/java/io/druid/cli/CliRealtime.java +++ b/services/src/main/java/io/druid/cli/CliRealtime.java @@ -32,6 +32,7 @@ import com.metamx.druid.guice.LifecycleModule; import com.metamx.druid.guice.QueryRunnerFactoryModule; import com.metamx.druid.guice.QueryableModule; import com.metamx.druid.guice.RealtimeModule; +import com.metamx.druid.guice.ServerModule; import com.metamx.druid.guice.ServerViewModule; import com.metamx.druid.guice.StorageNodeModule; import com.metamx.druid.http.StatusResource; @@ -71,6 +72,7 @@ public class CliRealtime extends ServerRunnable AWSModule.class, DataSegmentPusherModule.class, new MetricsModule(), + new ServerModule(), new StorageNodeModule("realtime"), new JettyServerModule(new QueryJettyServerInitializer()) .addResource(StatusResource.class), diff --git a/services/src/main/java/io/druid/cli/CliRealtimeExample.java b/services/src/main/java/io/druid/cli/CliRealtimeExample.java index 1b11ea07e14..1a6f9f97381 100644 --- a/services/src/main/java/io/druid/cli/CliRealtimeExample.java +++ b/services/src/main/java/io/druid/cli/CliRealtimeExample.java @@ -25,6 +25,7 @@ import com.metamx.druid.guice.DruidProcessingModule; import com.metamx.druid.guice.LifecycleModule; import com.metamx.druid.guice.QueryRunnerFactoryModule; import com.metamx.druid.guice.QueryableModule; +import com.metamx.druid.guice.ServerModule; import com.metamx.druid.guice.StorageNodeModule; import com.metamx.druid.http.StatusResource; import com.metamx.druid.initialization.EmitterModule; @@ -56,6 +57,7 @@ public class CliRealtimeExample extends ServerRunnable new LifecycleModule(), EmitterModule.class, DruidProcessingModule.class, + new ServerModule(), new StorageNodeModule("realtime"), new JettyServerModule(new QueryJettyServerInitializer()) .addResource(StatusResource.class), diff --git a/services/src/main/java/io/druid/cli/Main.java b/services/src/main/java/io/druid/cli/Main.java index b2274ffe096..934348c7f4d 100644 --- a/services/src/main/java/io/druid/cli/Main.java +++ b/services/src/main/java/io/druid/cli/Main.java @@ -40,7 +40,8 @@ public class Main .withDescription("Run one of the Druid server types.") .withDefaultCommand(Help.class) .withCommands( - CliCoordinator.class, CliHistorical.class, CliBroker.class, CliRealtime.class, CliOverlord.class, CliMiddleManager.class + CliCoordinator.class, CliHistorical.class, CliBroker.class, CliRealtime.class, + CliOverlord.class, CliMiddleManager.class, CliPeon.class ); builder.withGroup("example")