From eaaad01de7d89cdf5396c962e903f9f68885c50d Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Tue, 17 May 2016 15:44:42 -0700 Subject: [PATCH] [QTL] Datasource as lookupTier (#2955) * Datasource as lookup tier * Adds an option to let indexing service tasks pull their lookup tier from the datasource they are working for. * Fix bad docs for lookups lookupTier * Add Datasource name holder * Move task and datasource to be pulled from Task file * Make LookupModule pull from bound dataSource * Fix test * Fix code style on imports * Fix formatting * Make naming better * Address code comments about naming --- docs/content/querying/lookups.md | 4 +- .../indexing/overlord/ForkingTaskRunner.java | 2 +- .../io/druid/query/lookup/LookupModule.java | 38 +++-- .../jetty/JettyServerModule.java | 20 +-- .../metrics/DataSourceTaskIdHolder.java | 45 ++++++ .../druid/server/metrics/MetricsModule.java | 42 +++-- .../druid/server/metrics/MonitorsConfig.java | 18 +++ .../LookupListeningAnnouncerConfigTest.java | 144 ++++++++++++++++++ .../server/metrics/MetricsModuleTest.java | 91 +++++++++++ .../src/main/java/io/druid/cli/CliPeon.java | 35 +++++ 10 files changed, 385 insertions(+), 54 deletions(-) create mode 100644 server/src/main/java/io/druid/server/metrics/DataSourceTaskIdHolder.java create mode 100644 server/src/test/java/io/druid/query/lookup/LookupListeningAnnouncerConfigTest.java create mode 100644 server/src/test/java/io/druid/server/metrics/MetricsModuleTest.java diff --git a/docs/content/querying/lookups.md b/docs/content/querying/lookups.md index 7e315103fb7..95a578799f3 100644 --- a/docs/content/querying/lookups.md +++ b/docs/content/querying/lookups.md @@ -291,8 +291,8 @@ To configure a Broker / Router / Historical / Peon to announce itself as part of |Property | Description | Default | |---------|-------------|---------| -|`druid.lookup.tierName`| The tier for **lookups** for this node. This is independent of other tiers.|`__default`| - +|`druid.lookup.lookupTier`| The tier for **lookups** for this node. This is independent of other tiers.|`__default`| +|`druid.lookup.lookupTierIsDatasource`|For some things like indexing service tasks, the datasource is passed in the runtime properties of a task. This option fetches the tierName from the same value as the datasource for the task. It is suggested to only use this as peon options for the indexing service, if at all. If true, `druid.lookup.lookupTier` MUST NOT be specified|`"false"`| ## Saving configuration across restarts diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java index c5b49a92721..f87972f97da 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java @@ -347,7 +347,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer } } - // Add dataSource and taskId for metrics + // Add dataSource and taskId for metrics or logging command.add( String.format( "-D%s%s=%s", diff --git a/server/src/main/java/io/druid/query/lookup/LookupModule.java b/server/src/main/java/io/druid/query/lookup/LookupModule.java index b0bd2dbcb6e..ddb4d2906c0 100644 --- a/server/src/main/java/io/druid/query/lookup/LookupModule.java +++ b/server/src/main/java/io/druid/query/lookup/LookupModule.java @@ -26,6 +26,8 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.net.HostAndPort; @@ -50,16 +52,16 @@ import io.druid.server.listener.announcer.ListeningAnnouncerConfig; import io.druid.server.listener.resource.AbstractListenerHandler; import io.druid.server.listener.resource.ListenerResource; import io.druid.server.lookup.cache.LookupCoordinatorManager; -import org.apache.curator.utils.ZKPaths; - -import javax.ws.rs.Path; +import io.druid.server.metrics.DataSourceTaskIdHolder; import java.util.HashMap; import java.util.List; import java.util.Map; +import javax.ws.rs.Path; +import org.apache.curator.utils.ZKPaths; public class LookupModule implements DruidModule { - private static final String PROPERTY_BASE = "druid.lookup"; + static final String PROPERTY_BASE = "druid.lookup"; public static final String FAILED_UPDATES_KEY = "failedUpdates"; public static String getTierListenerPath(String tier) @@ -185,30 +187,34 @@ class LookupResourceListenerAnnouncer extends ListenerResourceAnnouncer class LookupListeningAnnouncerConfig extends ListeningAnnouncerConfig { public static final String DEFAULT_TIER = "__default"; + private final DataSourceTaskIdHolder dataSourceTaskIdHolder; @JsonProperty("lookupTier") private String lookupTier = null; + @JsonProperty("lookupTierIsDatasource") + private boolean lookupTierIsDatasource = false; @JsonCreator - public static LookupListeningAnnouncerConfig createLookupListeningAnnouncerConfig( + public LookupListeningAnnouncerConfig( @JacksonInject ZkPathsConfig zkPathsConfig, - @JsonProperty("lookupTier") String lookupTier + @JacksonInject DataSourceTaskIdHolder dataSourceTaskIdHolder ) - { - final LookupListeningAnnouncerConfig lookupListeningAnnouncerConfig = new LookupListeningAnnouncerConfig( - zkPathsConfig); - lookupListeningAnnouncerConfig.lookupTier = lookupTier; - return lookupListeningAnnouncerConfig; - } - - @Inject - public LookupListeningAnnouncerConfig(ZkPathsConfig zkPathsConfig) { super(zkPathsConfig); + this.dataSourceTaskIdHolder = dataSourceTaskIdHolder; } public String getLookupTier() { - return lookupTier == null ? DEFAULT_TIER : lookupTier; + Preconditions.checkArgument( + !(lookupTierIsDatasource && null != lookupTier), + "Cannot specify both `lookupTier` and `lookupTierIsDatasource`" + ); + final String lookupTier = lookupTierIsDatasource ? dataSourceTaskIdHolder.getDataSource() : this.lookupTier; + return Preconditions.checkNotNull( + lookupTier == null ? DEFAULT_TIER : Strings.emptyToNull(lookupTier), + "Cannot have empty lookup tier from %s", + lookupTierIsDatasource ? "bound value" : LookupModule.PROPERTY_BASE + ); } public String getLookupKey() diff --git a/server/src/main/java/io/druid/server/initialization/jetty/JettyServerModule.java b/server/src/main/java/io/druid/server/initialization/jetty/JettyServerModule.java index 6d8ee01037a..db722005de4 100644 --- a/server/src/main/java/io/druid/server/initialization/jetty/JettyServerModule.java +++ b/server/src/main/java/io/druid/server/initialization/jetty/JettyServerModule.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; import com.fasterxml.jackson.jaxrs.smile.JacksonSmileProvider; import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; import com.google.common.primitives.Ints; import com.google.inject.Binder; import com.google.inject.ConfigurationException; @@ -52,10 +51,10 @@ import io.druid.guice.annotations.JSR311Resource; import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Self; import io.druid.guice.annotations.Smile; -import io.druid.query.DruidMetrics; import io.druid.server.DruidNode; import io.druid.server.StatusResource; import io.druid.server.initialization.ServerConfig; +import io.druid.server.metrics.DataSourceTaskIdHolder; import io.druid.server.metrics.MetricsModule; import io.druid.server.metrics.MonitorsConfig; import org.eclipse.jetty.server.ConnectionFactory; @@ -69,7 +68,6 @@ import javax.servlet.ServletException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -222,24 +220,20 @@ public class JettyServerModule extends JerseyServletModule @Provides @Singleton - public JettyMonitor getJettyMonitor(Properties props) + public JettyMonitor getJettyMonitor( + DataSourceTaskIdHolder dataSourceTaskIdHolder + ) { - return new JettyMonitor(props); + return new JettyMonitor(dataSourceTaskIdHolder.getDataSource(), dataSourceTaskIdHolder.getTaskId()); } public static class JettyMonitor extends AbstractMonitor { private final Map dimensions; - public JettyMonitor(Properties props) + public JettyMonitor(String dataSource, String taskId) { - this.dimensions = MonitorsConfig.extractDimensions( - props, - Lists.newArrayList( - DruidMetrics.DATASOURCE, - DruidMetrics.TASK_ID - ) - ); + this.dimensions = MonitorsConfig.mapOfDatasourceAndTaskID(dataSource, taskId); } @Override diff --git a/server/src/main/java/io/druid/server/metrics/DataSourceTaskIdHolder.java b/server/src/main/java/io/druid/server/metrics/DataSourceTaskIdHolder.java new file mode 100644 index 00000000000..47bd9a7145e --- /dev/null +++ b/server/src/main/java/io/druid/server/metrics/DataSourceTaskIdHolder.java @@ -0,0 +1,45 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.metrics; + +import com.google.inject.Inject; +import com.google.inject.name.Named; + +public class DataSourceTaskIdHolder +{ + public static final String DATA_SOURCE_BINDING = "druidDataSource"; + public static final String TASK_ID_BINDING = "druidTaskId"; + @Named(DATA_SOURCE_BINDING) + @Inject(optional = true) + String dataSource = null; + @Named(TASK_ID_BINDING) + @Inject(optional = true) + String taskId = null; + + public String getDataSource() + { + return dataSource; + } + + public String getTaskId() + { + return taskId; + } +} diff --git a/server/src/main/java/io/druid/server/metrics/MetricsModule.java b/server/src/main/java/io/druid/server/metrics/MetricsModule.java index 220321ef458..18f9215f2b2 100644 --- a/server/src/main/java/io/druid/server/metrics/MetricsModule.java +++ b/server/src/main/java/io/druid/server/metrics/MetricsModule.java @@ -40,11 +40,9 @@ import io.druid.guice.DruidBinders; import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; import io.druid.guice.ManageLifecycle; -import io.druid.query.DruidMetrics; import io.druid.query.ExecutorServiceMonitor; import java.util.List; -import java.util.Properties; import java.util.Set; /** @@ -68,6 +66,8 @@ public class MetricsModule implements Module DruidBinders.metricMonitorBinder(binder); // get the binder so that it will inject the empty set at a minimum. + binder.bind(DataSourceTaskIdHolder.class).in(LazySingleton.class); + binder.bind(EventReceiverFirehoseRegister.class).in(LazySingleton.class); binder.bind(ExecutorServiceMonitor.class).in(LazySingleton.class); @@ -107,39 +107,37 @@ public class MetricsModule implements Module @Provides @ManageLifecycle - public JvmMonitor getJvmMonitor(Properties props) + public JvmMonitor getJvmMonitor( + DataSourceTaskIdHolder dataSourceTaskIdHolder + ) { - return new JvmMonitor(MonitorsConfig.extractDimensions(props, - Lists.newArrayList( - DruidMetrics.DATASOURCE, - DruidMetrics.TASK_ID - ) + return new JvmMonitor(MonitorsConfig.mapOfDatasourceAndTaskID( + dataSourceTaskIdHolder.getDataSource(), + dataSourceTaskIdHolder.getTaskId() )); } @Provides @ManageLifecycle - public JvmCpuMonitor getJvmCpuMonitor(Properties props) + public JvmCpuMonitor getJvmCpuMonitor( + DataSourceTaskIdHolder dataSourceTaskIdHolder + ) { - return new JvmCpuMonitor(MonitorsConfig.extractDimensions(props, - Lists.newArrayList( - DruidMetrics.DATASOURCE, - DruidMetrics.TASK_ID - ) + return new JvmCpuMonitor(MonitorsConfig.mapOfDatasourceAndTaskID( + dataSourceTaskIdHolder.getDataSource(), + dataSourceTaskIdHolder.getTaskId() )); } @Provides @ManageLifecycle - public SysMonitor getSysMonitor(Properties props) + public SysMonitor getSysMonitor( + DataSourceTaskIdHolder dataSourceTaskIdHolder + ) { - return new SysMonitor(MonitorsConfig.extractDimensions(props, - Lists.newArrayList( - DruidMetrics.DATASOURCE, - DruidMetrics.TASK_ID - ) + return new SysMonitor(MonitorsConfig.mapOfDatasourceAndTaskID( + dataSourceTaskIdHolder.getDataSource(), + dataSourceTaskIdHolder.getTaskId() )); } - - } diff --git a/server/src/main/java/io/druid/server/metrics/MonitorsConfig.java b/server/src/main/java/io/druid/server/metrics/MonitorsConfig.java index fc761dd7bab..b7ba7912462 100644 --- a/server/src/main/java/io/druid/server/metrics/MonitorsConfig.java +++ b/server/src/main/java/io/druid/server/metrics/MonitorsConfig.java @@ -20,8 +20,10 @@ package io.druid.server.metrics; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.metamx.metrics.Monitor; +import io.druid.query.DruidMetrics; import javax.validation.constraints.NotNull; import java.util.HashMap; @@ -52,6 +54,22 @@ public class MonitorsConfig '}'; } + + public static Map mapOfDatasourceAndTaskID( + final String datasource, + final String taskId + ) + { + final ImmutableMap.Builder builder = ImmutableMap.builder(); + if (datasource != null) { + builder.put(DruidMetrics.DATASOURCE, new String[]{datasource}); + } + if (taskId != null) { + builder.put(DruidMetrics.ID, new String[]{taskId}); + } + return builder.build(); + } + public static Map extractDimensions(Properties props, List dimensions) { Map dimensionsMap = new HashMap<>(); diff --git a/server/src/test/java/io/druid/query/lookup/LookupListeningAnnouncerConfigTest.java b/server/src/test/java/io/druid/query/lookup/LookupListeningAnnouncerConfigTest.java new file mode 100644 index 00000000000..9cf55103183 --- /dev/null +++ b/server/src/test/java/io/druid/query/lookup/LookupListeningAnnouncerConfigTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.lookup; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Module; +import com.google.inject.name.Names; +import io.druid.guice.GuiceInjectors; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.JsonConfigurator; +import io.druid.guice.annotations.Self; +import io.druid.initialization.Initialization; +import io.druid.server.DruidNode; +import io.druid.server.metrics.DataSourceTaskIdHolder; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Properties; + +public class LookupListeningAnnouncerConfigTest +{ + private static final String propertyBase = "some.property"; + private final Injector injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), + ImmutableList.of( + new Module() + { + @Override + public void configure(Binder binder) + { + JsonConfigProvider.bindInstance( + binder, Key.get(DruidNode.class, Self.class), new DruidNode("test-inject", null, null) + ); + binder.bind(Key.get( + String.class, + Names.named(DataSourceTaskIdHolder.DATA_SOURCE_BINDING) + )).toInstance("some_datasource"); + } + }, + new LookupModule() + ) + ); + + private final Properties properties = injector.getInstance(Properties.class); + + @Before + public void setUp() + { + properties.clear(); + } + + @Test + public void testDefaultInjection() + { + final JsonConfigurator configurator = injector.getBinding(JsonConfigurator.class).getProvider().get(); + final JsonConfigProvider configProvider = JsonConfigProvider.of( + propertyBase, + LookupListeningAnnouncerConfig.class + ); + configProvider.inject(properties, configurator); + final LookupListeningAnnouncerConfig config = configProvider.get().get(); + Assert.assertEquals(LookupListeningAnnouncerConfig.DEFAULT_TIER, config.getLookupTier()); + } + + @Test + public void testSimpleInjection() + { + final String lookupTier = "some_tier"; + final JsonConfigurator configurator = injector.getBinding(JsonConfigurator.class).getProvider().get(); + properties.put(propertyBase + ".lookupTier", lookupTier); + final JsonConfigProvider configProvider = JsonConfigProvider.of( + propertyBase, + LookupListeningAnnouncerConfig.class + ); + configProvider.inject(properties, configurator); + final LookupListeningAnnouncerConfig config = configProvider.get().get(); + Assert.assertEquals(lookupTier, config.getLookupTier()); + } + + @Test(expected = NullPointerException.class) + public void testFailsOnEmptyTier() + { + final JsonConfigurator configurator = injector.getBinding(JsonConfigurator.class).getProvider().get(); + properties.put(propertyBase + ".lookupTier", ""); + final JsonConfigProvider configProvider = JsonConfigProvider.of( + propertyBase, + LookupListeningAnnouncerConfig.class + ); + configProvider.inject(properties, configurator); + final LookupListeningAnnouncerConfig config = configProvider.get().get(); + config.getLookupTier(); + } + + @Test + public void testDatasourceInjection() + { + final JsonConfigurator configurator = injector.getBinding(JsonConfigurator.class).getProvider().get(); + properties.put(propertyBase + ".lookupTierIsDatasource", "true"); + final JsonConfigProvider configProvider = JsonConfigProvider.of( + propertyBase, + LookupListeningAnnouncerConfig.class + ); + configProvider.inject(properties, configurator); + final LookupListeningAnnouncerConfig config = configProvider.get().get(); + Assert.assertEquals("some_datasource", config.getLookupTier()); + } + + @Test(expected = IllegalArgumentException.class) + public void testFailsInjection() + { + final String lookupTier = "some_tier"; + final JsonConfigurator configurator = injector.getBinding(JsonConfigurator.class).getProvider().get(); + properties.put(propertyBase + ".lookupTier", lookupTier); + properties.put(propertyBase + ".lookupTierIsDatasource", "true"); + final JsonConfigProvider configProvider = JsonConfigProvider.of( + propertyBase, + LookupListeningAnnouncerConfig.class + ); + configProvider.inject(properties, configurator); + final LookupListeningAnnouncerConfig config = configProvider.get().get(); + Assert.assertEquals(lookupTier, config.getLookupTier()); + } +} diff --git a/server/src/test/java/io/druid/server/metrics/MetricsModuleTest.java b/server/src/test/java/io/druid/server/metrics/MetricsModuleTest.java new file mode 100644 index 00000000000..9ecde7bb892 --- /dev/null +++ b/server/src/test/java/io/druid/server/metrics/MetricsModuleTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.metrics; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Module; +import com.google.inject.name.Names; +import io.druid.guice.GuiceInjectors; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.annotations.Self; +import io.druid.initialization.Initialization; +import io.druid.server.DruidNode; +import org.junit.Assert; +import org.junit.Test; + +public class MetricsModuleTest +{ + @Test + public void testSimpleInjection() + { + final Injector injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), + ImmutableList.of(new Module() + { + + @Override + public void configure(Binder binder) + { + JsonConfigProvider.bindInstance( + binder, Key.get(DruidNode.class, Self.class), new DruidNode("test-inject", null, null) + ); + } + }) + ); + final DataSourceTaskIdHolder dimensionIdHolder = new DataSourceTaskIdHolder(); + injector.injectMembers(dimensionIdHolder); + Assert.assertNull(dimensionIdHolder.getDataSource()); + Assert.assertNull(dimensionIdHolder.getTaskId()); + } + + @Test + public void testSimpleInjectionWithValues() + { + final String dataSource = "some datasource"; + final String taskId = "some task"; + final Injector injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), + ImmutableList.of(new Module() + { + + @Override + public void configure(Binder binder) + { + JsonConfigProvider.bindInstance( + binder, Key.get(DruidNode.class, Self.class), new DruidNode("test-inject", null, null) + ); + binder.bind(Key.get( + String.class, + Names.named(DataSourceTaskIdHolder.DATA_SOURCE_BINDING) + )).toInstance(dataSource); + binder.bind(Key.get(String.class, Names.named(DataSourceTaskIdHolder.TASK_ID_BINDING))) + .toInstance(taskId); + } + }) + ); + final DataSourceTaskIdHolder dimensionIdHolder = new DataSourceTaskIdHolder(); + injector.injectMembers(dimensionIdHolder); + Assert.assertEquals(dataSource, dimensionIdHolder.getDataSource()); + Assert.assertEquals(taskId, dimensionIdHolder.getTaskId()); + } +} diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index 6f82b3915a2..7d421bdd583 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -19,6 +19,7 @@ package io.druid.cli; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; @@ -26,7 +27,9 @@ import com.google.inject.Inject; import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Module; +import com.google.inject.Provides; import com.google.inject.multibindings.MapBinder; +import com.google.inject.name.Named; import com.google.inject.name.Names; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; @@ -45,6 +48,7 @@ import io.druid.guice.LifecycleModule; import io.druid.guice.ManageLifecycle; import io.druid.guice.NodeTypeConfig; import io.druid.guice.PolyBind; +import io.druid.guice.annotations.Json; import io.druid.indexing.common.RetryPolicyConfig; import io.druid.indexing.common.RetryPolicyFactory; import io.druid.indexing.common.TaskToolboxFactory; @@ -54,6 +58,7 @@ import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionToolbox; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.config.TaskStorageConfig; +import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.HeapMemoryTaskStorage; import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import io.druid.indexing.overlord.TaskRunner; @@ -81,9 +86,11 @@ import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.QueryResource; import io.druid.server.initialization.jetty.ChatHandlerServerModule; import io.druid.server.initialization.jetty.JettyServerInitializer; +import io.druid.server.metrics.DataSourceTaskIdHolder; import org.eclipse.jetty.server.Server; import java.io.File; +import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Properties; @@ -223,6 +230,34 @@ public class CliPeon extends GuiceRunnable .to(RemoteTaskActionClientFactory.class).in(LazySingleton.class); } + + @Provides + @LazySingleton + public Task readTask(@Json ObjectMapper mapper, ExecutorLifecycleConfig config) + { + try { + return mapper.readValue(config.getTaskFile(), Task.class); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + + @Provides + @LazySingleton + @Named(DataSourceTaskIdHolder.DATA_SOURCE_BINDING) + public String getDataSourceFromTask(final Task task) + { + return task.getDataSource(); + } + + @Provides + @LazySingleton + @Named(DataSourceTaskIdHolder.TASK_ID_BINDING) + public String getTaskIDFromTask(final Task task) + { + return task.getId(); + } }, new IndexingServiceFirehoseModule(), new ChatHandlerServerModule(properties),