[QTL] Datasource as lookupTier (#2955)

* Datasource as lookup tier
* Adds an option to let indexing service tasks pull their lookup tier from the datasource they are working for.

* Fix bad docs for lookups lookupTier

* Add Datasource name holder

* Move task and datasource to be pulled from Task file

* Make LookupModule pull from bound dataSource

* Fix test

* Fix code style on imports

* Fix formatting

* Make naming better

* Address code comments about naming
This commit is contained in:
Charles Allen 2016-05-17 15:44:42 -07:00 committed by Fangjin Yang
parent fb01db4db7
commit eaaad01de7
10 changed files with 385 additions and 54 deletions

View File

@ -291,8 +291,8 @@ To configure a Broker / Router / Historical / Peon to announce itself as part of
|Property | Description | Default |
|---------|-------------|---------|
|`druid.lookup.tierName`| The tier for **lookups** for this node. This is independent of other tiers.|`__default`|
|`druid.lookup.lookupTier`| The tier for **lookups** for this node. This is independent of other tiers.|`__default`|
|`druid.lookup.lookupTierIsDatasource`|For some things like indexing service tasks, the datasource is passed in the runtime properties of a task. This option fetches the tierName from the same value as the datasource for the task. It is suggested to only use this as peon options for the indexing service, if at all. If true, `druid.lookup.lookupTier` MUST NOT be specified|`"false"`|
## Saving configuration across restarts

View File

@ -347,7 +347,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
}
}
// Add dataSource and taskId for metrics
// Add dataSource and taskId for metrics or logging
command.add(
String.format(
"-D%s%s=%s",

View File

@ -26,6 +26,8 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.net.HostAndPort;
@ -50,16 +52,16 @@ import io.druid.server.listener.announcer.ListeningAnnouncerConfig;
import io.druid.server.listener.resource.AbstractListenerHandler;
import io.druid.server.listener.resource.ListenerResource;
import io.druid.server.lookup.cache.LookupCoordinatorManager;
import org.apache.curator.utils.ZKPaths;
import javax.ws.rs.Path;
import io.druid.server.metrics.DataSourceTaskIdHolder;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.ws.rs.Path;
import org.apache.curator.utils.ZKPaths;
public class LookupModule implements DruidModule
{
private static final String PROPERTY_BASE = "druid.lookup";
static final String PROPERTY_BASE = "druid.lookup";
public static final String FAILED_UPDATES_KEY = "failedUpdates";
public static String getTierListenerPath(String tier)
@ -185,30 +187,34 @@ class LookupResourceListenerAnnouncer extends ListenerResourceAnnouncer
class LookupListeningAnnouncerConfig extends ListeningAnnouncerConfig
{
public static final String DEFAULT_TIER = "__default";
private final DataSourceTaskIdHolder dataSourceTaskIdHolder;
@JsonProperty("lookupTier")
private String lookupTier = null;
@JsonProperty("lookupTierIsDatasource")
private boolean lookupTierIsDatasource = false;
@JsonCreator
public static LookupListeningAnnouncerConfig createLookupListeningAnnouncerConfig(
public LookupListeningAnnouncerConfig(
@JacksonInject ZkPathsConfig zkPathsConfig,
@JsonProperty("lookupTier") String lookupTier
@JacksonInject DataSourceTaskIdHolder dataSourceTaskIdHolder
)
{
final LookupListeningAnnouncerConfig lookupListeningAnnouncerConfig = new LookupListeningAnnouncerConfig(
zkPathsConfig);
lookupListeningAnnouncerConfig.lookupTier = lookupTier;
return lookupListeningAnnouncerConfig;
}
@Inject
public LookupListeningAnnouncerConfig(ZkPathsConfig zkPathsConfig)
{
super(zkPathsConfig);
this.dataSourceTaskIdHolder = dataSourceTaskIdHolder;
}
public String getLookupTier()
{
return lookupTier == null ? DEFAULT_TIER : lookupTier;
Preconditions.checkArgument(
!(lookupTierIsDatasource && null != lookupTier),
"Cannot specify both `lookupTier` and `lookupTierIsDatasource`"
);
final String lookupTier = lookupTierIsDatasource ? dataSourceTaskIdHolder.getDataSource() : this.lookupTier;
return Preconditions.checkNotNull(
lookupTier == null ? DEFAULT_TIER : Strings.emptyToNull(lookupTier),
"Cannot have empty lookup tier from %s",
lookupTierIsDatasource ? "bound value" : LookupModule.PROPERTY_BASE
);
}
public String getLookupKey()

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import com.fasterxml.jackson.jaxrs.smile.JacksonSmileProvider;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import com.google.inject.Binder;
import com.google.inject.ConfigurationException;
@ -52,10 +51,10 @@ import io.druid.guice.annotations.JSR311Resource;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Self;
import io.druid.guice.annotations.Smile;
import io.druid.query.DruidMetrics;
import io.druid.server.DruidNode;
import io.druid.server.StatusResource;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.metrics.DataSourceTaskIdHolder;
import io.druid.server.metrics.MetricsModule;
import io.druid.server.metrics.MonitorsConfig;
import org.eclipse.jetty.server.ConnectionFactory;
@ -69,7 +68,6 @@ import javax.servlet.ServletException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@ -222,24 +220,20 @@ public class JettyServerModule extends JerseyServletModule
@Provides
@Singleton
public JettyMonitor getJettyMonitor(Properties props)
public JettyMonitor getJettyMonitor(
DataSourceTaskIdHolder dataSourceTaskIdHolder
)
{
return new JettyMonitor(props);
return new JettyMonitor(dataSourceTaskIdHolder.getDataSource(), dataSourceTaskIdHolder.getTaskId());
}
public static class JettyMonitor extends AbstractMonitor
{
private final Map<String, String[]> dimensions;
public JettyMonitor(Properties props)
public JettyMonitor(String dataSource, String taskId)
{
this.dimensions = MonitorsConfig.extractDimensions(
props,
Lists.newArrayList(
DruidMetrics.DATASOURCE,
DruidMetrics.TASK_ID
)
);
this.dimensions = MonitorsConfig.mapOfDatasourceAndTaskID(dataSource, taskId);
}
@Override

View File

@ -0,0 +1,45 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.metrics;
import com.google.inject.Inject;
import com.google.inject.name.Named;
public class DataSourceTaskIdHolder
{
public static final String DATA_SOURCE_BINDING = "druidDataSource";
public static final String TASK_ID_BINDING = "druidTaskId";
@Named(DATA_SOURCE_BINDING)
@Inject(optional = true)
String dataSource = null;
@Named(TASK_ID_BINDING)
@Inject(optional = true)
String taskId = null;
public String getDataSource()
{
return dataSource;
}
public String getTaskId()
{
return taskId;
}
}

View File

@ -40,11 +40,9 @@ import io.druid.guice.DruidBinders;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.ManageLifecycle;
import io.druid.query.DruidMetrics;
import io.druid.query.ExecutorServiceMonitor;
import java.util.List;
import java.util.Properties;
import java.util.Set;
/**
@ -68,6 +66,8 @@ public class MetricsModule implements Module
DruidBinders.metricMonitorBinder(binder); // get the binder so that it will inject the empty set at a minimum.
binder.bind(DataSourceTaskIdHolder.class).in(LazySingleton.class);
binder.bind(EventReceiverFirehoseRegister.class).in(LazySingleton.class);
binder.bind(ExecutorServiceMonitor.class).in(LazySingleton.class);
@ -107,39 +107,37 @@ public class MetricsModule implements Module
@Provides
@ManageLifecycle
public JvmMonitor getJvmMonitor(Properties props)
public JvmMonitor getJvmMonitor(
DataSourceTaskIdHolder dataSourceTaskIdHolder
)
{
return new JvmMonitor(MonitorsConfig.extractDimensions(props,
Lists.newArrayList(
DruidMetrics.DATASOURCE,
DruidMetrics.TASK_ID
)
return new JvmMonitor(MonitorsConfig.mapOfDatasourceAndTaskID(
dataSourceTaskIdHolder.getDataSource(),
dataSourceTaskIdHolder.getTaskId()
));
}
@Provides
@ManageLifecycle
public JvmCpuMonitor getJvmCpuMonitor(Properties props)
public JvmCpuMonitor getJvmCpuMonitor(
DataSourceTaskIdHolder dataSourceTaskIdHolder
)
{
return new JvmCpuMonitor(MonitorsConfig.extractDimensions(props,
Lists.newArrayList(
DruidMetrics.DATASOURCE,
DruidMetrics.TASK_ID
)
return new JvmCpuMonitor(MonitorsConfig.mapOfDatasourceAndTaskID(
dataSourceTaskIdHolder.getDataSource(),
dataSourceTaskIdHolder.getTaskId()
));
}
@Provides
@ManageLifecycle
public SysMonitor getSysMonitor(Properties props)
public SysMonitor getSysMonitor(
DataSourceTaskIdHolder dataSourceTaskIdHolder
)
{
return new SysMonitor(MonitorsConfig.extractDimensions(props,
Lists.newArrayList(
DruidMetrics.DATASOURCE,
DruidMetrics.TASK_ID
)
return new SysMonitor(MonitorsConfig.mapOfDatasourceAndTaskID(
dataSourceTaskIdHolder.getDataSource(),
dataSourceTaskIdHolder.getTaskId()
));
}
}

View File

@ -20,8 +20,10 @@
package io.druid.server.metrics;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.metrics.Monitor;
import io.druid.query.DruidMetrics;
import javax.validation.constraints.NotNull;
import java.util.HashMap;
@ -52,6 +54,22 @@ public class MonitorsConfig
'}';
}
public static Map<String, String[]> mapOfDatasourceAndTaskID(
final String datasource,
final String taskId
)
{
final ImmutableMap.Builder<String, String[]> builder = ImmutableMap.builder();
if (datasource != null) {
builder.put(DruidMetrics.DATASOURCE, new String[]{datasource});
}
if (taskId != null) {
builder.put(DruidMetrics.ID, new String[]{taskId});
}
return builder.build();
}
public static Map<String, String[]> extractDimensions(Properties props, List<String> dimensions)
{
Map<String, String[]> dimensionsMap = new HashMap<>();

View File

@ -0,0 +1,144 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.lookup;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.name.Names;
import io.druid.guice.GuiceInjectors;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.JsonConfigurator;
import io.druid.guice.annotations.Self;
import io.druid.initialization.Initialization;
import io.druid.server.DruidNode;
import io.druid.server.metrics.DataSourceTaskIdHolder;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Properties;
public class LookupListeningAnnouncerConfigTest
{
private static final String propertyBase = "some.property";
private final Injector injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(),
ImmutableList.of(
new Module()
{
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bindInstance(
binder, Key.get(DruidNode.class, Self.class), new DruidNode("test-inject", null, null)
);
binder.bind(Key.get(
String.class,
Names.named(DataSourceTaskIdHolder.DATA_SOURCE_BINDING)
)).toInstance("some_datasource");
}
},
new LookupModule()
)
);
private final Properties properties = injector.getInstance(Properties.class);
@Before
public void setUp()
{
properties.clear();
}
@Test
public void testDefaultInjection()
{
final JsonConfigurator configurator = injector.getBinding(JsonConfigurator.class).getProvider().get();
final JsonConfigProvider<LookupListeningAnnouncerConfig> configProvider = JsonConfigProvider.of(
propertyBase,
LookupListeningAnnouncerConfig.class
);
configProvider.inject(properties, configurator);
final LookupListeningAnnouncerConfig config = configProvider.get().get();
Assert.assertEquals(LookupListeningAnnouncerConfig.DEFAULT_TIER, config.getLookupTier());
}
@Test
public void testSimpleInjection()
{
final String lookupTier = "some_tier";
final JsonConfigurator configurator = injector.getBinding(JsonConfigurator.class).getProvider().get();
properties.put(propertyBase + ".lookupTier", lookupTier);
final JsonConfigProvider<LookupListeningAnnouncerConfig> configProvider = JsonConfigProvider.of(
propertyBase,
LookupListeningAnnouncerConfig.class
);
configProvider.inject(properties, configurator);
final LookupListeningAnnouncerConfig config = configProvider.get().get();
Assert.assertEquals(lookupTier, config.getLookupTier());
}
@Test(expected = NullPointerException.class)
public void testFailsOnEmptyTier()
{
final JsonConfigurator configurator = injector.getBinding(JsonConfigurator.class).getProvider().get();
properties.put(propertyBase + ".lookupTier", "");
final JsonConfigProvider<LookupListeningAnnouncerConfig> configProvider = JsonConfigProvider.of(
propertyBase,
LookupListeningAnnouncerConfig.class
);
configProvider.inject(properties, configurator);
final LookupListeningAnnouncerConfig config = configProvider.get().get();
config.getLookupTier();
}
@Test
public void testDatasourceInjection()
{
final JsonConfigurator configurator = injector.getBinding(JsonConfigurator.class).getProvider().get();
properties.put(propertyBase + ".lookupTierIsDatasource", "true");
final JsonConfigProvider<LookupListeningAnnouncerConfig> configProvider = JsonConfigProvider.of(
propertyBase,
LookupListeningAnnouncerConfig.class
);
configProvider.inject(properties, configurator);
final LookupListeningAnnouncerConfig config = configProvider.get().get();
Assert.assertEquals("some_datasource", config.getLookupTier());
}
@Test(expected = IllegalArgumentException.class)
public void testFailsInjection()
{
final String lookupTier = "some_tier";
final JsonConfigurator configurator = injector.getBinding(JsonConfigurator.class).getProvider().get();
properties.put(propertyBase + ".lookupTier", lookupTier);
properties.put(propertyBase + ".lookupTierIsDatasource", "true");
final JsonConfigProvider<LookupListeningAnnouncerConfig> configProvider = JsonConfigProvider.of(
propertyBase,
LookupListeningAnnouncerConfig.class
);
configProvider.inject(properties, configurator);
final LookupListeningAnnouncerConfig config = configProvider.get().get();
Assert.assertEquals(lookupTier, config.getLookupTier());
}
}

View File

@ -0,0 +1,91 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.metrics;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.name.Names;
import io.druid.guice.GuiceInjectors;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.annotations.Self;
import io.druid.initialization.Initialization;
import io.druid.server.DruidNode;
import org.junit.Assert;
import org.junit.Test;
public class MetricsModuleTest
{
@Test
public void testSimpleInjection()
{
final Injector injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(),
ImmutableList.of(new Module()
{
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bindInstance(
binder, Key.get(DruidNode.class, Self.class), new DruidNode("test-inject", null, null)
);
}
})
);
final DataSourceTaskIdHolder dimensionIdHolder = new DataSourceTaskIdHolder();
injector.injectMembers(dimensionIdHolder);
Assert.assertNull(dimensionIdHolder.getDataSource());
Assert.assertNull(dimensionIdHolder.getTaskId());
}
@Test
public void testSimpleInjectionWithValues()
{
final String dataSource = "some datasource";
final String taskId = "some task";
final Injector injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(),
ImmutableList.of(new Module()
{
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bindInstance(
binder, Key.get(DruidNode.class, Self.class), new DruidNode("test-inject", null, null)
);
binder.bind(Key.get(
String.class,
Names.named(DataSourceTaskIdHolder.DATA_SOURCE_BINDING)
)).toInstance(dataSource);
binder.bind(Key.get(String.class, Names.named(DataSourceTaskIdHolder.TASK_ID_BINDING)))
.toInstance(taskId);
}
})
);
final DataSourceTaskIdHolder dimensionIdHolder = new DataSourceTaskIdHolder();
injector.injectMembers(dimensionIdHolder);
Assert.assertEquals(dataSource, dimensionIdHolder.getDataSource());
Assert.assertEquals(taskId, dimensionIdHolder.getTaskId());
}
}

View File

@ -19,6 +19,7 @@
package io.druid.cli;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
@ -26,7 +27,9 @@ import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.multibindings.MapBinder;
import com.google.inject.name.Named;
import com.google.inject.name.Names;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
@ -45,6 +48,7 @@ import io.druid.guice.LifecycleModule;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.NodeTypeConfig;
import io.druid.guice.PolyBind;
import io.druid.guice.annotations.Json;
import io.druid.indexing.common.RetryPolicyConfig;
import io.druid.indexing.common.RetryPolicyFactory;
import io.druid.indexing.common.TaskToolboxFactory;
@ -54,6 +58,7 @@ import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.HeapMemoryTaskStorage;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.TaskRunner;
@ -81,9 +86,11 @@ import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.QueryResource;
import io.druid.server.initialization.jetty.ChatHandlerServerModule;
import io.druid.server.initialization.jetty.JettyServerInitializer;
import io.druid.server.metrics.DataSourceTaskIdHolder;
import org.eclipse.jetty.server.Server;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
@ -223,6 +230,34 @@ public class CliPeon extends GuiceRunnable
.to(RemoteTaskActionClientFactory.class).in(LazySingleton.class);
}
@Provides
@LazySingleton
public Task readTask(@Json ObjectMapper mapper, ExecutorLifecycleConfig config)
{
try {
return mapper.readValue(config.getTaskFile(), Task.class);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
@Provides
@LazySingleton
@Named(DataSourceTaskIdHolder.DATA_SOURCE_BINDING)
public String getDataSourceFromTask(final Task task)
{
return task.getDataSource();
}
@Provides
@LazySingleton
@Named(DataSourceTaskIdHolder.TASK_ID_BINDING)
public String getTaskIDFromTask(final Task task)
{
return task.getId();
}
},
new IndexingServiceFirehoseModule(),
new ChatHandlerServerModule(properties),