Merge branch 'master' into docs

Conflicts:
	docs/content/Loading-Your-Data.md
This commit is contained in:
cheddar 2013-09-27 18:29:56 -05:00
commit 154b58defd
42 changed files with 624 additions and 445 deletions

View File

@ -57,7 +57,7 @@ public class ConfigProvider<T> implements Provider<T>
private final Class<T> clazz;
private final Map<String, String> replacements;
private T object = null;
private ConfigurationObjectFactory factory = null;
public ConfigProvider(
Class<T> clazz,
@ -70,20 +70,21 @@ public class ConfigProvider<T> implements Provider<T>
@Inject
public void inject(ConfigurationObjectFactory factory)
{
this.factory = factory;
}
@Override
public T get()
{
try {
// ConfigMagic handles a null replacements
object = factory.buildWithReplacements(clazz, replacements);
Preconditions.checkNotNull(factory, "WTF!? Code misconfigured, inject() didn't get called.");
return factory.buildWithReplacements(clazz, replacements);
}
catch (IllegalArgumentException e) {
log.info("Unable to build instance of class[%s]", clazz);
throw e;
}
}
@Override
public T get()
{
return Preconditions.checkNotNull(object, "WTF!? Code misconfigured, inject() didn't get called.");
}
}

View File

@ -1,130 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer;
import com.google.common.collect.ImmutableList;
import com.metamx.common.Pair;
import com.metamx.common.lifecycle.Lifecycle;
import java.util.List;
/**
*/
@Deprecated
public class HadoopDruidIndexer
{
public static void main(String[] args) throws Exception
{
if (args.length < 1 || args.length > 2) {
printHelp();
System.exit(2);
}
HadoopDruidIndexerNode node = HadoopDruidIndexerNode.builder().build();
if (args.length == 2) {
node.setIntervalSpec(args[0]);
}
node.setArgumentSpec(args[args.length == 1 ? 0 : 1]);
Lifecycle lifecycle = new Lifecycle();
lifecycle.addManagedInstance(node);
try {
lifecycle.start();
}
catch (Exception e) {
e.printStackTrace();
Thread.sleep(500);
printHelp();
System.exit(1);
}
}
private static final List<Pair<String, String>> expectedFields =
ImmutableList.<Pair<String, String>>builder()
.add(Pair.of("dataSource", "Name of dataSource"))
.add(Pair.of("timestampColumn", "Column name of the timestamp column"))
.add(Pair.of("timestampFormat", "Format name of the timestamp column (posix or iso)"))
.add(
Pair.of(
"dataSpec",
"A JSON object with fields "
+
"format=(json, csv, tsv), "
+
"columns=JSON array of column names for the delimited text input file (only for csv and tsv formats),"
+
"dimensions=JSON array of dimensionn names (must match names in columns),"
+
"delimiter=delimiter of the data (only for tsv format)"
)
)
.add(
Pair.of(
"granularitySpec",
"A JSON object indicating the Granularity that segments should be created at."
)
)
.add(
Pair.of(
"pathSpec",
"A JSON object with fields type=granularity, inputPath, filePattern, dataGranularity"
)
)
.add(
Pair.of(
"rollupSpec",
"JSON object with fields rollupGranularity, aggs=JSON Array of Aggregator specs"
)
)
.add(Pair.of("workingPath", "Path to store intermediate output data. Deleted when finished."))
.add(Pair.of("segmentOutputPath", "Path to store output segments."))
.add(
Pair.of(
"updaterJobSpec",
"JSON object with fields type=db, connectURI of the database, username, password, and segment table name"
)
)
.add(Pair.of("cleanupOnFailure", "Clean up intermediate files on failure? (default: true)"))
.add(Pair.of("leaveIntermediate", "Leave intermediate files. (default: false)"))
.add(Pair.of("partitionDimension", "Dimension to partition by (optional)"))
.add(
Pair.of(
"targetPartitionSize",
"Integer representing the target number of rows in a partition (required if partitionDimension != null)"
)
)
.build();
private static void printHelp()
{
System.out.println("Usage: <java invocation> <config_spec>");
System.out.println("<config_spec> is either a JSON object or the path to a file that contains a JSON object.");
System.out.println();
System.out.println("JSON object description:");
System.out.println("{");
for (Pair<String, String> expectedField : expectedFields) {
System.out.printf(" \"%s\": %s%n", expectedField.lhs, expectedField.rhs);
}
System.out.println("}");
}
}

View File

@ -97,27 +97,6 @@ public class HadoopDruidIndexerConfig
public static HadoopDruidIndexerConfig fromMap(Map<String, Object> argSpec)
{
List<Registererer> registererers = Lists.transform(
MapUtils.getList(argSpec, "registererers", ImmutableList.of()),
new Function<Object, Registererer>()
{
@Override
public Registererer apply(@Nullable Object input)
{
try {
return (Registererer) Class.forName((String) input).newInstance();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
if (!registererers.isEmpty()) {
Registererers.registerHandlers(registererers, Arrays.asList(jsonMapper));
}
return jsonMapper.convertValue(argSpec, HadoopDruidIndexerConfig.class);
}
@ -179,7 +158,6 @@ public class HadoopDruidIndexerConfig
private volatile DataRollupSpec rollupSpec;
private volatile DbUpdaterJobSpec updaterJobSpec;
private volatile boolean ignoreInvalidRows = false;
private volatile List<String> registererers = Lists.newArrayList();
@JsonCreator
public HadoopDruidIndexerConfig(
@ -203,8 +181,7 @@ public class HadoopDruidIndexerConfig
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
final @JsonProperty("rollupSpec") DataRollupSpec rollupSpec,
final @JsonProperty("updaterJobSpec") DbUpdaterJobSpec updaterJobSpec,
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
final @JsonProperty("registererers") List<String> registererers
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows
)
{
this.dataSource = dataSource;
@ -224,7 +201,6 @@ public class HadoopDruidIndexerConfig
this.rollupSpec = rollupSpec;
this.updaterJobSpec = updaterJobSpec;
this.ignoreInvalidRows = ignoreInvalidRows;
this.registererers = registererers;
if(partitionsSpec != null) {
Preconditions.checkArgument(
@ -517,17 +493,6 @@ public class HadoopDruidIndexerConfig
this.ignoreInvalidRows = ignoreInvalidRows;
}
@JsonProperty
public List<String> getRegistererers()
{
return registererers;
}
public void setRegistererers(List<String> registererers)
{
this.registererers = registererers;
}
/********************************************
Granularity/Bucket Helper Methods
********************************************/

View File

@ -22,6 +22,7 @@ package io.druid.indexer;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.timeline.DataSegment;
@ -48,6 +49,7 @@ public class HadoopDruidIndexerJob implements Jobby
private IndexGeneratorJob indexJob;
private volatile List<DataSegment> publishedSegments = null;
@Inject
public HadoopDruidIndexerJob(
HadoopDruidIndexerConfig config
)

View File

@ -24,6 +24,7 @@ import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler;
import io.druid.client.ServerView;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Processing;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;

View File

@ -27,8 +27,8 @@ import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.response.ToStringResponseHandler;
import io.druid.client.indexing.IndexingServiceSelector;
import io.druid.client.selector.Server;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.indexing.common.RetryPolicy;
import io.druid.indexing.common.RetryPolicyFactory;
import io.druid.indexing.common.task.Task;
@ -42,7 +42,7 @@ public class RemoteTaskActionClient implements TaskActionClient
{
private final Task task;
private final HttpClient httpClient;
private final IndexingServiceSelector serviceProvider;
private final ServerDiscoverySelector selector;
private final RetryPolicyFactory retryPolicyFactory;
private final ObjectMapper jsonMapper;
@ -51,14 +51,14 @@ public class RemoteTaskActionClient implements TaskActionClient
public RemoteTaskActionClient(
Task task,
HttpClient httpClient,
IndexingServiceSelector serviceProvider,
ServerDiscoverySelector selector,
RetryPolicyFactory retryPolicyFactory,
ObjectMapper jsonMapper
)
{
this.task = task;
this.httpClient = httpClient;
this.serviceProvider = serviceProvider;
this.selector = selector;
this.retryPolicyFactory = retryPolicyFactory;
this.jsonMapper = jsonMapper;
}
@ -127,19 +127,11 @@ public class RemoteTaskActionClient implements TaskActionClient
private URI getServiceUri() throws Exception
{
final Server instance = serviceProvider.pick();
final Server instance = selector.pick();
if (instance == null) {
throw new ISE("Cannot find instance of indexer to talk to!");
}
return new URI(
instance.getScheme(),
null,
instance.getHost(),
instance.getPort(),
"/druid/indexer/v1/action",
null,
null
);
return new URI(String.format("%s://%s%s", instance.getScheme(), instance.getHost(), "/druid/indexer/v1/action"));
}
}

View File

@ -22,7 +22,8 @@ package io.druid.indexing.common.actions;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.metamx.http.client.HttpClient;
import io.druid.client.indexing.IndexingServiceSelector;
import io.druid.client.indexing.IndexingService;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.guice.annotations.Global;
import io.druid.indexing.common.RetryPolicyFactory;
import io.druid.indexing.common.task.Task;
@ -32,20 +33,20 @@ import io.druid.indexing.common.task.Task;
public class RemoteTaskActionClientFactory implements TaskActionClientFactory
{
private final HttpClient httpClient;
private final IndexingServiceSelector serviceProvider;
private final ServerDiscoverySelector selector;
private final RetryPolicyFactory retryPolicyFactory;
private final ObjectMapper jsonMapper;
@Inject
public RemoteTaskActionClientFactory(
@Global HttpClient httpClient,
IndexingServiceSelector serviceProvider,
@IndexingService ServerDiscoverySelector selector,
RetryPolicyFactory retryPolicyFactory,
ObjectMapper jsonMapper
)
{
this.httpClient = httpClient;
this.serviceProvider = serviceProvider;
this.selector = selector;
this.retryPolicyFactory = retryPolicyFactory;
this.jsonMapper = jsonMapper;
}
@ -53,6 +54,6 @@ public class RemoteTaskActionClientFactory implements TaskActionClientFactory
@Override
public TaskActionClient create(Task task)
{
return new RemoteTaskActionClient(task, httpClient, serviceProvider, retryPolicyFactory, jsonMapper);
return new RemoteTaskActionClient(task, httpClient, selector, retryPolicyFactory, jsonMapper);
}
}

View File

@ -125,7 +125,6 @@ public abstract class MergeTaskBase extends AbstractTask
final File taskDir = toolbox.getTaskWorkDir();
try {
final long startTime = System.currentTimeMillis();
log.info(

View File

@ -55,12 +55,19 @@ public class ForkingTaskRunnerConfig
private String classpath = System.getProperty("java.class.path");
@JsonProperty
@Min(1024) @Max(65535)
@Min(1024)
@Max(65535)
private int startPort = 8080;
@JsonProperty
@NotNull
List<String> allowedPrefixes = Lists.newArrayList("com.metamx", "druid", "io.druid");
List<String> allowedPrefixes = Lists.newArrayList(
"com.metamx",
"druid",
"io.druid",
"user.timezone",
"file.encoding"
);
public int maxForks()
{

View File

@ -19,7 +19,6 @@
package io.druid.indexing.coordinator.http;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.Collections2;
@ -89,7 +88,6 @@ public class IndexerCoordinatorResource
private final TaskStorageQueryAdapter taskStorageQueryAdapter;
private final TaskLogStreamer taskLogStreamer;
private final JacksonConfigManager configManager;
private final ObjectMapper jsonMapper;
private AtomicReference<WorkerSetupData> workerSetupDataRef = null;
@ -98,21 +96,20 @@ public class IndexerCoordinatorResource
TaskMaster taskMaster,
TaskStorageQueryAdapter taskStorageQueryAdapter,
TaskLogStreamer taskLogStreamer,
JacksonConfigManager configManager,
ObjectMapper jsonMapper
JacksonConfigManager configManager
) throws Exception
{
this.taskMaster = taskMaster;
this.taskStorageQueryAdapter = taskStorageQueryAdapter;
this.taskLogStreamer = taskLogStreamer;
this.configManager = configManager;
this.jsonMapper = jsonMapper;
}
@POST
@Path("/merge")
@Consumes("application/json")
@Produces("application/json")
@Deprecated
public Response doMerge(final Task task)
{
// legacy endpoint
@ -123,6 +120,7 @@ public class IndexerCoordinatorResource
@Path("/index")
@Consumes("application/json")
@Produces("application/json")
@Deprecated
public Response doIndex(final Task task)
{
return taskPost(task);

View File

@ -19,7 +19,6 @@
package io.druid.indexing.coordinator.http;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import io.druid.common.config.JacksonConfigManager;
import io.druid.indexing.common.tasklogs.TaskLogStreamer;
@ -39,10 +38,9 @@ public class OldIndexerCoordinatorResource extends IndexerCoordinatorResource
TaskMaster taskMaster,
TaskStorageQueryAdapter taskStorageQueryAdapter,
TaskLogStreamer taskLogStreamer,
JacksonConfigManager configManager,
ObjectMapper jsonMapper
JacksonConfigManager configManager
) throws Exception
{
super(taskMaster, taskStorageQueryAdapter, taskLogStreamer, configManager, jsonMapper);
super(taskMaster, taskStorageQueryAdapter, taskLogStreamer, configManager);
}
}

View File

@ -34,6 +34,7 @@ import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import io.druid.guice.annotations.Json;
import io.druid.indexing.coordinator.setup.EC2NodeData;
import io.druid.indexing.coordinator.setup.GalaxyUserData;
import io.druid.indexing.coordinator.setup.WorkerSetupData;
@ -55,7 +56,7 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy
@Inject
public EC2AutoScalingStrategy(
ObjectMapper jsonMapper,
@Json ObjectMapper jsonMapper,
AmazonEC2 amazonEC2Client,
SimpleResourceManagementConfig config,
Supplier<WorkerSetupData> workerSetupDataRef

View File

@ -54,10 +54,14 @@ public class ScalingStats
public ScalingStats(int capacity)
{
this.recentEvents = MinMaxPriorityQueue
.orderedBy(comparator)
.maximumSize(capacity)
.create();
if (capacity == 0) {
this.recentEvents = MinMaxPriorityQueue.orderedBy(comparator).create();
} else {
this.recentEvents = MinMaxPriorityQueue
.orderedBy(comparator)
.maximumSize(capacity)
.create();
}
}
public void addProvisionEvent(AutoScalingData data)

View File

@ -36,10 +36,6 @@ public class WorkerConfig
@NotNull
private String version = null;
@JsonProperty
@NotNull
private String overlordService = null;
@JsonProperty
@Min(1)
private int capacity = Runtime.getRuntime().availableProcessors() - 1;
@ -54,11 +50,6 @@ public class WorkerConfig
return version;
}
public String getOverlordService()
{
return overlordService;
}
public int getCapacity()
{
return capacity;

View File

@ -123,6 +123,7 @@ public class ExecutorLifecycle
jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(taskStatus)
);
statusFile.getParentFile().mkdirs();
jsonMapper.writeValue(statusFile, taskStatus);
return taskStatus;

View File

@ -3,22 +3,22 @@
var oTable = [];
$(document).ready(function() {
$.get('/mmx/merger/v1/runningTasks', function(data) {
$.get('/druid/indexer/v1/runningTasks', function(data) {
$('.running_loading').hide();
buildTable(data, $('#runningTable'), ["segments"]);
});
$.get('/mmx/merger/v1/pendingTasks', function(data) {
$.get('/druid/indexer/v1/pendingTasks', function(data) {
$('.pending_loading').hide();
buildTable(data, $('#pendingTable'), ["segments"]);
});
$.get('/mmx/merger/v1/workers', function(data) {
$.get('/druid/indexer/v1/workers', function(data) {
$('.workers_loading').hide();
buildTable(data, $('#workerTable'));
});
$.get('/mmx/merger/v1/scaling', function(data) {
$.get('/druid/indexer/v1/scaling', function(data) {
$('.events_loading').hide();
buildTable(data, $('#eventTable'));
});

View File

@ -359,8 +359,7 @@ public class TaskSerdeTest
false,
new DataRollupSpec(ImmutableList.<AggregatorFactory>of(), QueryGranularity.NONE),
null,
false,
ImmutableList.<String>of()
false
)
);

View File

@ -20,6 +20,7 @@
package io.druid.segment.realtime;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.db.DbTablesConfig;
import io.druid.timeline.DataSegment;
@ -40,6 +41,7 @@ public class DbSegmentPublisher implements SegmentPublisher
private final DbTablesConfig config;
private final IDBI dbi;
@Inject
public DbSegmentPublisher(
ObjectMapper jsonMapper,
DbTablesConfig config,

View File

@ -29,7 +29,7 @@ public class DruidServerConfig
{
@JsonProperty
@Min(0)
private long maxSize = -1;
private long maxSize = 0;
@JsonProperty
private String tier = "_default_tier";

View File

@ -26,8 +26,8 @@ import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.response.InputStreamResponseHandler;
import io.druid.client.selector.DiscoverySelector;
import io.druid.client.selector.Server;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.guice.annotations.Global;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
@ -43,18 +43,18 @@ public class IndexingServiceClient
private final HttpClient client;
private final ObjectMapper jsonMapper;
private final DiscoverySelector<Server> serviceProvider;
private final ServerDiscoverySelector selector;
@Inject
public IndexingServiceClient(
@Global HttpClient client,
ObjectMapper jsonMapper,
@IndexingService DiscoverySelector<Server> serviceProvider
@IndexingService ServerDiscoverySelector selector
)
{
this.client = client;
this.jsonMapper = jsonMapper;
this.serviceProvider = serviceProvider;
this.selector = selector;
}
public void mergeSegments(List<DataSegment> segments)
@ -72,28 +72,28 @@ public class IndexingServiceClient
}
}
runQuery("merge", new ClientAppendQuery(dataSource, segments));
runQuery(new ClientAppendQuery(dataSource, segments));
}
public void killSegments(String dataSource, Interval interval)
{
runQuery("index", new ClientKillQuery(dataSource, interval));
runQuery(new ClientKillQuery(dataSource, interval));
}
public void upgradeSegment(DataSegment dataSegment)
{
runQuery("task", new ClientConversionQuery(dataSegment));
runQuery(new ClientConversionQuery(dataSegment));
}
public void upgradeSegments(String dataSource, Interval interval)
{
runQuery("task", new ClientConversionQuery(dataSource, interval));
runQuery(new ClientConversionQuery(dataSource, interval));
}
private InputStream runQuery(String endpoint, Object queryObject)
private InputStream runQuery(Object queryObject)
{
try {
return client.post(new URL(String.format("%s/%s", baseUrl(), endpoint)))
return client.post(new URL(String.format("%s/task", baseUrl())))
.setContent("application/json", jsonMapper.writeValueAsBytes(queryObject))
.go(RESPONSE_HANDLER)
.get();
@ -106,12 +106,12 @@ public class IndexingServiceClient
private String baseUrl()
{
try {
final Server instance = serviceProvider.pick();
final Server instance = selector.pick();
if (instance == null) {
throw new ISE("Cannot find instance of indexingService");
}
return String.format("http://%s:%s/druid/indexer/v1", instance.getHost(), instance.getPort());
return String.format("http://%s/druid/indexer/v1", instance.getHost());
}
catch (Exception e) {
throw Throwables.propagate(e);

View File

@ -35,6 +35,8 @@ import io.druid.guice.DruidBinders;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.KeyHolder;
import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.annotations.Self;
import io.druid.server.DruidNode;
import io.druid.server.initialization.CuratorDiscoveryConfig;
import org.apache.curator.framework.CuratorFramework;
@ -59,7 +61,7 @@ import java.util.concurrent.ThreadFactory;
/**
* The DiscoveryModule allows for the registration of Keys of DruidNode objects, which it intends to be
* automatically announced at the end of the lifecycle start.
*
* <p/>
* In order for this to work a ServiceAnnouncer instance *must* be injected and instantiated first.
* This can often be achieved by registering ServiceAnnouncer.class with the LifecycleModule.
*/
@ -69,7 +71,7 @@ public class DiscoveryModule implements Module
/**
* Requests that the un-annotated DruidNode instance be injected and published as part of the lifecycle.
*
* <p/>
* That is, this module will announce the DruidNode instance returned by
* injector.getInstance(Key.get(DruidNode.class)) automatically.
* Announcement will happen in the LAST stage of the Lifecycle
@ -81,7 +83,7 @@ public class DiscoveryModule implements Module
/**
* Requests that the annotated DruidNode instance be injected and published as part of the lifecycle.
*
* <p/>
* That is, this module will announce the DruidNode instance returned by
* injector.getInstance(Key.get(DruidNode.class, annotation)) automatically.
* Announcement will happen in the LAST stage of the Lifecycle
@ -95,7 +97,7 @@ public class DiscoveryModule implements Module
/**
* Requests that the annotated DruidNode instance be injected and published as part of the lifecycle.
*
* <p/>
* That is, this module will announce the DruidNode instance returned by
* injector.getInstance(Key.get(DruidNode.class, annotation)) automatically.
* Announcement will happen in the LAST stage of the Lifecycle
@ -109,7 +111,7 @@ public class DiscoveryModule implements Module
/**
* Requests that the keyed DruidNode instance be injected and published as part of the lifecycle.
*
* <p/>
* That is, this module will announce the DruidNode instance returned by
* injector.getInstance(Key.get(DruidNode.class, annotation)) automatically.
* Announcement will happen in the LAST stage of the Lifecycle
@ -119,6 +121,7 @@ public class DiscoveryModule implements Module
public static void registerKey(Binder binder, Key<DruidNode> key)
{
DruidBinders.discoveryAnnouncementBinder(binder).addBinding().toInstance(new KeyHolder<>(key));
LifecycleModule.register(binder, ServiceAnnouncer.class);
}
@Override
@ -131,13 +134,14 @@ public class DiscoveryModule implements Module
// Build the binder so that it will at a minimum inject an empty set.
DruidBinders.discoveryAnnouncementBinder(binder);
// We bind this eagerly so that it gets instantiated and registers stuff with Lifecycle as a side-effect
binder.bind(ServiceAnnouncer.class)
.to(Key.get(CuratorServiceAnnouncer.class, Names.named(NAME)))
.asEagerSingleton();
.in(LazySingleton.class);
}
@Provides @LazySingleton @Named(NAME)
@Provides
@LazySingleton
@Named(NAME)
public CuratorServiceAnnouncer getServiceAnnouncer(
final CuratorServiceAnnouncer announcer,
final Injector injector,
@ -181,7 +185,8 @@ public class DiscoveryModule implements Module
return announcer;
}
@Provides @LazySingleton
@Provides
@LazySingleton
public ServiceDiscovery<Void> getServiceDiscovery(
CuratorFramework curator,
CuratorDiscoveryConfig config,
@ -217,13 +222,21 @@ public class DiscoveryModule implements Module
throw Throwables.propagate(e);
}
}
},
Lifecycle.Stage.LAST
}
);
return serviceDiscovery;
}
@Provides
@LazySingleton
public ServerDiscoveryFactory getServerDiscoveryFactory(
ServiceDiscovery<Void> serviceDiscovery
)
{
return new ServerDiscoveryFactory(serviceDiscovery);
}
private static class NoopServiceDiscovery<T> implements ServiceDiscovery<T>
{
@Override

View File

@ -0,0 +1,72 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.curator.discovery;
import com.google.inject.Inject;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceProvider;
import java.io.IOException;
/**
*/
public class ServerDiscoveryFactory
{
private final ServiceDiscovery<Void> serviceDiscovery;
@Inject
public ServerDiscoveryFactory(ServiceDiscovery<Void> serviceDiscovery)
{
this.serviceDiscovery = serviceDiscovery;
}
public ServerDiscoverySelector createSelector(String serviceName)
{
if (serviceName == null) {
return new ServerDiscoverySelector(new NoopServiceProvider());
}
final ServiceProvider serviceProvider = serviceDiscovery.serviceProviderBuilder().serviceName(serviceName).build();
return new ServerDiscoverySelector(serviceProvider);
}
private static class NoopServiceProvider<T> implements ServiceProvider<T>
{
@Override
public void start() throws Exception
{
// do nothing
}
@Override
public ServiceInstance<T> getInstance() throws Exception
{
return null;
}
@Override
public void close() throws IOException
{
// do nothing
}
}
}

View File

@ -17,9 +17,8 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client.indexing;
package io.druid.curator.discovery;
import com.google.inject.Inject;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
@ -28,21 +27,18 @@ import io.druid.client.selector.Server;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceProvider;
import javax.annotation.Nullable;
import java.io.IOException;
/**
*/
public class IndexingServiceSelector implements DiscoverySelector<Server>
*/
public class ServerDiscoverySelector implements DiscoverySelector<Server>
{
private static final Logger log = new Logger(IndexingServiceSelector.class);
private static final Logger log = new Logger(ServerDiscoverySelector.class);
private final ServiceProvider serviceProvider;
@Inject
public IndexingServiceSelector(
@Nullable @IndexingService ServiceProvider serviceProvider
) {
public ServerDiscoverySelector(ServiceProvider serviceProvider)
{
this.serviceProvider = serviceProvider;
}
@ -54,7 +50,12 @@ public class IndexingServiceSelector implements DiscoverySelector<Server>
instance = serviceProvider.getInstance();
}
catch (Exception e) {
log.info(e, "");
log.info(e, "Exception getting instance");
return null;
}
if (instance == null) {
log.error("No server instance found");
return null;
}

View File

@ -22,17 +22,10 @@ package io.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.TypeLiteral;
import io.druid.client.indexing.IndexingService;
import io.druid.client.indexing.IndexingServiceSelector;
import io.druid.client.indexing.IndexingServiceSelectorConfig;
import io.druid.client.selector.DiscoverySelector;
import io.druid.client.selector.Server;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceProvider;
import java.io.IOException;
import io.druid.curator.discovery.ServerDiscoveryFactory;
import io.druid.curator.discovery.ServerDiscoverySelector;
/**
*/
@ -42,42 +35,16 @@ public class IndexingServiceDiscoveryModule implements Module
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.selectors.indexing", IndexingServiceSelectorConfig.class);
binder.bind(new TypeLiteral<DiscoverySelector<Server>>(){})
.annotatedWith(IndexingService.class)
.to(IndexingServiceSelector.class);
binder.bind(IndexingServiceSelector.class).in(ManageLifecycle.class);
}
@Provides
@LazySingleton @IndexingService
public ServiceProvider getServiceProvider(
@IndexingService
@ManageLifecycle
public ServerDiscoverySelector getServiceProvider(
IndexingServiceSelectorConfig config,
ServiceDiscovery<Void> serviceDiscovery
ServerDiscoveryFactory serverDiscoveryFactory
)
{
if (config.getServiceName() == null) {
return new ServiceProvider()
{
@Override
public void start() throws Exception
{
}
@Override
public ServiceInstance getInstance() throws Exception
{
return null;
}
@Override
public void close() throws IOException
{
}
};
}
return serviceDiscovery.serviceProviderBuilder().serviceName(config.getServiceName()).build();
return serverDiscoveryFactory.createSelector(config.getServiceName());
}
}

View File

@ -110,7 +110,6 @@ public class S3DataSegmentPuller implements DataSegmentPuller
return null;
}
catch (IOException e) {
FileUtils.deleteDirectory(outDir);
throw new IOException(String.format("Problem decompressing object[%s]", s3Obj), e);
}
finally {
@ -125,6 +124,16 @@ public class S3DataSegmentPuller implements DataSegmentPuller
);
}
catch (Exception e) {
try {
FileUtils.deleteDirectory(outDir);
} catch (IOException ioe) {
log.warn(
ioe,
"Failed to remove output directory for segment[%s] after exception: %s",
segment.getIdentifier(),
outDir
);
}
throw new SegmentLoadingException(e, e.getMessage());
}
}

View File

@ -116,11 +116,9 @@ public class QueryServlet extends HttpServlet
emitter.emit(
new ServiceMetricEvent.Builder()
.setUser2(query.getDataSource())
//.setUser3(originatorType)
.setUser4(query.getType())
.setUser5(query.getIntervals().get(0).toString())
.setUser6(String.valueOf(query.hasFilters()))
//.setUser8(originatorId)
.setUser9(query.getDuration().toPeriod().toStandardMinutes().toString())
.build("request/time", requestTime)
);

View File

@ -29,6 +29,10 @@ import java.util.List;
*/
public class ExtensionsConfig
{
@JsonProperty
@NotNull
private boolean searchCurrentClassloader = true;
@JsonProperty
@NotNull
private List<String> coordinates = ImmutableList.of();
@ -44,6 +48,11 @@ public class ExtensionsConfig
"https://metamx.artifactoryonline.com/metamx/pub-libs-releases-local"
);
public boolean searchCurrentClassloader()
{
return searchCurrentClassloader;
}
public List<String> getCoordinates()
{
return coordinates;

View File

@ -19,6 +19,8 @@
package io.druid.server.initialization;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import com.google.common.collect.Iterables;
import com.google.common.primitives.Ints;
import com.google.inject.Binder;
@ -28,6 +30,7 @@ import com.google.inject.Injector;
import com.google.inject.Provides;
import com.google.inject.ProvisionException;
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.sun.jersey.api.core.DefaultResourceConfig;
@ -39,7 +42,9 @@ import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.annotations.JSR311Resource;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Self;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.server.DruidNode;
import io.druid.server.StatusResource;
import org.eclipse.jetty.server.Connector;
@ -95,7 +100,8 @@ public class JettyServerModule extends JerseyServletModule
}
}
@Provides @LazySingleton
@Provides
@LazySingleton
public Server getServer(Injector injector, Lifecycle lifecycle, @Self DruidNode node, ServerConfig config)
{
JettyServerInitializer initializer = injector.getInstance(JettyServerInitializer.class);
@ -133,6 +139,15 @@ public class JettyServerModule extends JerseyServletModule
return server;
}
@Provides
@Singleton
public JacksonJsonProvider getJacksonJsonProvider(@Json ObjectMapper objectMapper)
{
final JacksonJsonProvider provider = new JacksonJsonProvider();
provider.setMapper(objectMapper);
return provider;
}
private static Server makeJettyServer(@Self DruidNode node, ServerConfig config)
{
final QueuedThreadPool threadPool = new QueuedThreadPool();

View File

@ -161,7 +161,7 @@ public class DruidMaster
this.exec = scheduledExecutorFactory.create(1, "Master-Exec--%d");
this.leaderLatch = new AtomicReference<>(null);
this.segmentSettingsAtomicReference= new AtomicReference<>(null);
this.segmentSettingsAtomicReference = new AtomicReference<>(null);
this.loadManagementPeons = loadQueuePeonMap;
}
@ -471,10 +471,13 @@ public class DruidMaster
serverInventoryView.start();
final List<Pair<? extends MasterRunnable, Duration>> masterRunnables = Lists.newArrayList();
segmentSettingsAtomicReference = configManager.watch(MasterSegmentSettings.CONFIG_KEY, MasterSegmentSettings.class,new MasterSegmentSettings.Builder().build());
segmentSettingsAtomicReference = configManager.watch(
MasterSegmentSettings.CONFIG_KEY,
MasterSegmentSettings.class,
new MasterSegmentSettings.Builder().build()
);
masterRunnables.add(Pair.of(new MasterComputeManagerRunnable(), config.getMasterPeriod()));
if (indexingServiceClient != null) {
masterRunnables.add(
Pair.of(
new MasterIndexingServiceRunnable(

View File

@ -42,7 +42,7 @@ public abstract class DruidMasterConfig
@Default("PT1800s")
public abstract Duration getMasterSegmentMergerPeriod();
@Config("druid.master.merger.on")
@Config("druid.master.merge.on")
public boolean isMergeSegments()
{
return false;

View File

@ -24,8 +24,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
public class MasterSegmentSettings
{
public static final String CONFIG_KEY = "master.dynamicConfigs";
private long millisToWaitBeforeDeleting=15 * 60 * 1000L;
private long mergeBytesLimit= 100000000L;
private long millisToWaitBeforeDeleting = 15 * 60 * 1000L;
private long mergeBytesLimit = 100000000L;
private int mergeSegmentsLimit = Integer.MAX_VALUE;
private int maxSegmentsToMove = 5;
private boolean emitBalancingStats = false;
@ -39,11 +39,11 @@ public class MasterSegmentSettings
@JsonProperty("emitBalancingStats") Boolean emitBalancingStats
)
{
this.maxSegmentsToMove=maxSegmentsToMove;
this.millisToWaitBeforeDeleting=millisToWaitBeforeDeleting;
this.mergeSegmentsLimit=mergeSegmentsLimit;
this.mergeBytesLimit=mergeBytesLimit;
this.emitBalancingStats = emitBalancingStats;
this.maxSegmentsToMove = maxSegmentsToMove;
this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting;
this.mergeSegmentsLimit = mergeSegmentsLimit;
this.mergeBytesLimit = mergeBytesLimit;
this.emitBalancingStats = emitBalancingStats;
}
public static String getConfigKey()
@ -81,7 +81,6 @@ public class MasterSegmentSettings
return maxSegmentsToMove;
}
public static class Builder
{
public static final String CONFIG_KEY = "master.dynamicConfigs";
@ -93,14 +92,16 @@ public class MasterSegmentSettings
public Builder()
{
this.millisToWaitBeforeDeleting=15 * 60 * 1000L;
this.mergeBytesLimit= 100000000L;
this.mergeSegmentsLimit= Integer.MAX_VALUE;
this.maxSegmentsToMove = 5;
this.emitBalancingStats = false;
this(15 * 60 * 1000L, 100000000L, Integer.MAX_VALUE, 5, false);
}
public Builder(long millisToWaitBeforeDeleting, long mergeBytesLimit, int mergeSegmentsLimit, int maxSegmentsToMove, boolean emitBalancingStats)
public Builder(
long millisToWaitBeforeDeleting,
long mergeBytesLimit,
int mergeSegmentsLimit,
int maxSegmentsToMove,
boolean emitBalancingStats
)
{
this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting;
this.mergeBytesLimit = mergeBytesLimit;
@ -111,31 +112,37 @@ public class MasterSegmentSettings
public Builder withMillisToWaitBeforeDeleting(long millisToWaitBeforeDeleting)
{
this.millisToWaitBeforeDeleting=millisToWaitBeforeDeleting;
this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting;
return this;
}
public Builder withMergeBytesLimit(long mergeBytesLimit)
{
this.mergeBytesLimit=mergeBytesLimit;
this.mergeBytesLimit = mergeBytesLimit;
return this;
}
public Builder withMergeSegmentsLimit(int mergeSegmentsLimit)
{
this.mergeSegmentsLimit=mergeSegmentsLimit;
this.mergeSegmentsLimit = mergeSegmentsLimit;
return this;
}
public Builder withMaxSegmentsToMove(int maxSegmentsToMove)
{
this.maxSegmentsToMove=maxSegmentsToMove;
this.maxSegmentsToMove = maxSegmentsToMove;
return this;
}
public MasterSegmentSettings build()
{
return new MasterSegmentSettings(millisToWaitBeforeDeleting,mergeBytesLimit,mergeSegmentsLimit,maxSegmentsToMove, emitBalancingStats);
return new MasterSegmentSettings(
millisToWaitBeforeDeleting,
mergeBytesLimit,
mergeSegmentsLimit,
maxSegmentsToMove,
emitBalancingStats
);
}
}
}

View File

@ -0,0 +1,59 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.cli;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceFilter;
import io.druid.server.QueryServlet;
import io.druid.server.initialization.JettyServerInitializer;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.GzipFilter;
/**
*/
public class BrokerJettyServerInitializer implements JettyServerInitializer
{
@Override
public void initialize(Server server, Injector injector)
{
final ServletContextHandler resources = new ServletContextHandler(ServletContextHandler.SESSIONS);
resources.addServlet(new ServletHolder(new DefaultServlet()), "/druid/v2/datasources/*");
resources.addFilter(GuiceFilter.class, "/druid/v2/datasources/*", null);
final ServletContextHandler queries = new ServletContextHandler(ServletContextHandler.SESSIONS);
queries.setResourceBase("/");
queries.addServlet(new ServletHolder(injector.getInstance(QueryServlet.class)), "/druid/v2/*");
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addFilter(GzipFilter.class, "/*", null);
root.addFilter(GuiceFilter.class, "/*", null);
final HandlerList handlerList = new HandlerList();
handlerList.setHandlers(new Handler[]{resources, queries, root, new DefaultHandler()});
server.setHandler(handlerList);
}
}

View File

@ -81,7 +81,7 @@ public class CliBroker extends ServerRunnable
JsonConfigProvider.bind(binder, "druid.broker.cache", CacheProvider.class);
binder.bind(QuerySegmentWalker.class).to(ClientQuerySegmentWalker.class).in(LazySingleton.class);
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);
binder.bind(JettyServerInitializer.class).to(BrokerJettyServerInitializer.class).in(LazySingleton.class);
Jerseys.addResource(binder, ClientInfoResource.class);
DiscoveryModule.register(binder, Self.class);

View File

@ -0,0 +1,137 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.cli;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.io.CharStreams;
import com.google.common.io.InputSupplier;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import io.airlift.command.Arguments;
import io.airlift.command.Command;
import io.druid.guice.LazySingleton;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.HadoopDruidIndexerJob;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.List;
/**
*/
@Command(
name = "hadoop",
description = "Runs the batch Hadoop Druid Indexer, see https://github.com/metamx/druid/wiki/Batch-ingestion for a description."
)
public class CliHadoopIndexer extends GuiceRunnable
{
@Arguments(description = "A JSON object or the path to a file that contains a JSON object", required = true)
private String argumentSpec;
private static final Logger log = new Logger(CliHadoopIndexer.class);
public CliHadoopIndexer()
{
super(log);
}
@Override
protected List<Object> getModules()
{
return ImmutableList.<Object>of(
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bind(HadoopDruidIndexerJob.class).in(LazySingleton.class);
}
@Provides
@LazySingleton
public HadoopDruidIndexerConfig getHadoopDruidIndexerConfig()
{
try {
if (argumentSpec.startsWith("{")) {
return HadoopDruidIndexerConfig.fromString(argumentSpec);
} else if (argumentSpec.startsWith("s3://")) {
final Path s3nPath = new Path(String.format("s3n://%s", argumentSpec.substring("s3://".length())));
final FileSystem fs = s3nPath.getFileSystem(new Configuration());
String configString = CharStreams.toString(
new InputSupplier<InputStreamReader>()
{
@Override
public InputStreamReader getInput() throws IOException
{
return new InputStreamReader(fs.open(s3nPath));
}
}
);
return HadoopDruidIndexerConfig.fromString(configString);
} else {
return HadoopDruidIndexerConfig.fromFile(new File(argumentSpec));
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
}
@Override
public void run()
{
try {
Injector injector = makeInjector();
final HadoopDruidIndexerJob job = injector.getInstance(HadoopDruidIndexerJob.class);
Lifecycle lifecycle = initLifecycle(injector);
job.run();
try {
lifecycle.stop();
}
catch (Throwable t) {
log.error(t, "Error when stopping. Failing.");
System.exit(1);
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}

View File

@ -57,6 +57,7 @@ import io.druid.indexing.coordinator.TaskRunnerFactory;
import io.druid.indexing.coordinator.TaskStorage;
import io.druid.indexing.coordinator.TaskStorageQueryAdapter;
import io.druid.indexing.coordinator.http.IndexerCoordinatorResource;
import io.druid.indexing.coordinator.http.OldIndexerCoordinatorResource;
import io.druid.indexing.coordinator.http.OverlordRedirectInfo;
import io.druid.indexing.coordinator.scaling.AutoScalingStrategy;
import io.druid.indexing.coordinator.scaling.EC2AutoScalingStrategy;
@ -139,6 +140,7 @@ public class CliOverlord extends ServerRunnable
binder.bind(JettyServerInitializer.class).toInstance(new OverlordJettyServerInitializer());
Jerseys.addResource(binder, IndexerCoordinatorResource.class);
Jerseys.addResource(binder, OldIndexerCoordinatorResource.class);
LifecycleModule.register(binder, Server.class);
}

View File

@ -22,7 +22,6 @@ package io.druid.cli;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
@ -35,6 +34,7 @@ import io.airlift.command.Option;
import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.NodeTypeConfig;
import io.druid.guice.PolyBind;
@ -52,13 +52,13 @@ import io.druid.indexing.coordinator.ThreadPoolTaskRunner;
import io.druid.indexing.worker.executor.ChatHandlerResource;
import io.druid.indexing.worker.executor.ExecutorLifecycle;
import io.druid.indexing.worker.executor.ExecutorLifecycleConfig;
import io.druid.initialization.LogLevelAdjuster;
import io.druid.query.QuerySegmentWalker;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.S3DataSegmentKiller;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.StorageLocationConfig;
import io.druid.server.initialization.JettyServerInitializer;
import org.eclipse.jetty.server.Server;
import java.io.File;
import java.util.Arrays;
@ -71,7 +71,7 @@ import java.util.List;
description = "Runs a Peon, this is an individual forked \"task\" used as part of the indexing service. "
+ "This should rarely, if ever, be used directly."
)
public class CliPeon implements Runnable
public class CliPeon extends GuiceRunnable
{
@Arguments(description = "task.json status.json", required = true)
public List<String> taskAndStatusFile;
@ -79,74 +79,71 @@ public class CliPeon implements Runnable
@Option(name = "--nodeType", title = "nodeType", description = "Set the node type to expose on ZK")
public String nodeType = "indexer-executor";
private Injector injector;
@Inject
public void configure(Injector injector)
{
this.injector = injector;
}
private static final Logger log = new Logger(CliPeon.class);
protected Injector getInjector()
public CliPeon()
{
return Initialization.makeInjectorWithModules(
injector,
ImmutableList.<Object>of(
new Module()
{
@Override
public void configure(Binder binder)
{
PolyBind.createChoice(
binder,
"druid.indexer.task.chathandler.type",
Key.get(ChatHandlerProvider.class),
Key.get(NoopChatHandlerProvider.class)
);
final MapBinder<String, ChatHandlerProvider> handlerProviderBinder = PolyBind.optionBinder(
binder, Key.get(ChatHandlerProvider.class)
);
handlerProviderBinder.addBinding("curator").to(EventReceivingChatHandlerProvider.class);
handlerProviderBinder.addBinding("noop").to(NoopChatHandlerProvider.class);
super(log);
}
binder.bind(TaskToolboxFactory.class).in(LazySingleton.class);
@Override
protected List<Object> getModules()
{
return ImmutableList.<Object>of(
new Module()
{
@Override
public void configure(Binder binder)
{
PolyBind.createChoice(
binder,
"druid.indexer.task.chathandler.type",
Key.get(ChatHandlerProvider.class),
Key.get(NoopChatHandlerProvider.class)
);
final MapBinder<String, ChatHandlerProvider> handlerProviderBinder = PolyBind.optionBinder(
binder, Key.get(ChatHandlerProvider.class)
);
handlerProviderBinder.addBinding("curator").to(EventReceivingChatHandlerProvider.class);
handlerProviderBinder.addBinding("noop").to(NoopChatHandlerProvider.class);
JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class);
JsonConfigProvider.bind(binder, "druid.worker.taskActionClient.retry", RetryPolicyConfig.class);
binder.bind(TaskToolboxFactory.class).in(LazySingleton.class);
binder.bind(TaskActionClientFactory.class)
.to(RemoteTaskActionClientFactory.class)
.in(LazySingleton.class);
binder.bind(RetryPolicyFactory.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class);
JsonConfigProvider.bind(binder, "druid.worker.taskActionClient.retry", RetryPolicyConfig.class);
binder.bind(DataSegmentKiller.class).to(S3DataSegmentKiller.class).in(LazySingleton.class);
binder.bind(TaskActionClientFactory.class)
.to(RemoteTaskActionClientFactory.class)
.in(LazySingleton.class);
binder.bind(RetryPolicyFactory.class).in(LazySingleton.class);
binder.bind(ExecutorLifecycle.class).in(ManageLifecycle.class);
binder.bind(ExecutorLifecycleConfig.class).toInstance(
new ExecutorLifecycleConfig()
.setTaskFile(new File(taskAndStatusFile.get(0)))
.setStatusFile(new File(taskAndStatusFile.get(1)))
);
binder.bind(DataSegmentKiller.class).to(S3DataSegmentKiller.class).in(LazySingleton.class);
binder.bind(TaskRunner.class).to(ThreadPoolTaskRunner.class);
binder.bind(QuerySegmentWalker.class).to(ThreadPoolTaskRunner.class);
binder.bind(ThreadPoolTaskRunner.class).in(ManageLifecycle.class);
binder.bind(ExecutorLifecycle.class).in(ManageLifecycle.class);
binder.bind(ExecutorLifecycleConfig.class).toInstance(
new ExecutorLifecycleConfig()
.setTaskFile(new File(taskAndStatusFile.get(0)))
.setStatusFile(new File(taskAndStatusFile.get(1)))
);
// Override the default SegmentLoaderConfig because we don't actually care about the
// configuration based locations. This will override them anyway. This is also stopping
// configuration of other parameters, but I don't think that's actually a problem.
// Note, if that is actually not a problem, then that probably means we have the wrong abstraction.
binder.bind(SegmentLoaderConfig.class)
.toInstance(new SegmentLoaderConfig().withLocations(Arrays.<StorageLocationConfig>asList()));
binder.bind(TaskRunner.class).to(ThreadPoolTaskRunner.class);
binder.bind(QuerySegmentWalker.class).to(ThreadPoolTaskRunner.class);
binder.bind(ThreadPoolTaskRunner.class).in(ManageLifecycle.class);
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class);
Jerseys.addResource(binder, ChatHandlerResource.class);
binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig(nodeType));
}
}
)
// Override the default SegmentLoaderConfig because we don't actually care about the
// configuration based locations. This will override them anyway. This is also stopping
// configuration of other parameters, but I don't think that's actually a problem.
// Note, if that is actually not a problem, then that probably means we have the wrong abstraction.
binder.bind(SegmentLoaderConfig.class)
.toInstance(new SegmentLoaderConfig().withLocations(Arrays.<StorageLocationConfig>asList()));
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class);
Jerseys.addResource(binder, ChatHandlerResource.class);
binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig(nodeType));
LifecycleModule.register(binder, Server.class);
}
}
);
}
@ -154,13 +151,11 @@ public class CliPeon implements Runnable
public void run()
{
try {
LogLevelAdjuster.register();
final Injector injector = getInjector();
final Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
Injector injector = makeInjector();
try {
lifecycle.start();
Lifecycle lifecycle = initLifecycle(injector);
injector.getInstance(ExecutorLifecycle.class).join();
lifecycle.stop();
}

View File

@ -0,0 +1,65 @@
package io.druid.cli;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import io.druid.initialization.LogLevelAdjuster;
import java.util.List;
/**
*/
public abstract class GuiceRunnable implements Runnable
{
private final Logger log;
private Injector baseInjector;
public GuiceRunnable(Logger log)
{
this.log = log;
}
@Inject
public void configure(Injector injector)
{
this.baseInjector = injector;
}
protected abstract List<Object> getModules();
public Injector makeInjector()
{
try {
return Initialization.makeInjectorWithModules(
baseInjector, getModules()
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
public Lifecycle initLifecycle(Injector injector)
{
try {
LogLevelAdjuster.register();
final Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
try {
lifecycle.start();
}
catch (Throwable t) {
log.error(t, "Error when starting up. Failing.");
System.exit(1);
}
return lifecycle;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}

View File

@ -97,6 +97,13 @@ public class
final TeslaAether aether = getAetherClient(config);
List<T> retVal = Lists.newArrayList();
if (config.searchCurrentClassloader()) {
for (T module : ServiceLoader.load(clazz, Initialization.class.getClassLoader())) {
log.info("Adding local module[%s]", module.getClass());
retVal.add(module);
}
}
for (String coordinate : config.getCoordinates()) {
log.info("Loading extension[%s]", coordinate);
try {
@ -139,14 +146,13 @@ public class
for (Artifact artifact : artifacts) {
if (!exclusions.contains(artifact.getGroupId())) {
urls.add(artifact.getFile().toURI().toURL());
}
else {
} else {
log.error("Skipped Artifact[%s]", artifact);
}
}
for (URL url : urls) {
log.error("Added URL[%s]", url);
log.info("Added URL[%s]", url);
}
loader = new URLClassLoader(urls.toArray(new URL[urls.size()]), Initialization.class.getClassLoader());
@ -243,7 +249,8 @@ public class
private final ObjectMapper smileMapper;
private final List<Module> modules;
public ModuleList(Injector baseInjector) {
public ModuleList(Injector baseInjector)
{
this.baseInjector = baseInjector;
this.jsonMapper = baseInjector.getInstance(Key.get(ObjectMapper.class, Json.class));
this.smileMapper = baseInjector.getInstance(Key.get(ObjectMapper.class, Smile.class));
@ -260,24 +267,19 @@ public class
if (input instanceof DruidModule) {
baseInjector.injectMembers(input);
modules.add(registerJacksonModules(((DruidModule) input)));
}
else if (input instanceof Module) {
} else if (input instanceof Module) {
baseInjector.injectMembers(input);
modules.add((Module) input);
}
else if (input instanceof Class) {
} else if (input instanceof Class) {
if (DruidModule.class.isAssignableFrom((Class) input)) {
modules.add(registerJacksonModules(baseInjector.getInstance((Class<? extends DruidModule>) input)));
}
else if (Module.class.isAssignableFrom((Class) input)) {
} else if (Module.class.isAssignableFrom((Class) input)) {
modules.add(baseInjector.getInstance((Class<? extends Module>) input));
return;
}
else {
} else {
throw new ISE("Class[%s] does not implement %s", input.getClass(), Module.class);
}
}
else {
} else {
throw new ISE("Unknown module type[%s]", input.getClass());
}
}

View File

@ -68,6 +68,11 @@ public class Main
.withDefaultCommand(Help.class)
.withCommands(ConvertProperties.class);
builder.withGroup("index")
.withDescription("Run indexing for druid")
.withDefaultCommand(Help.class)
.withCommands(CliHadoopIndexer.class);
builder.withGroup("internal")
.withDescription("Processes that Druid runs \"internally\", you should rarely use these directly")
.withDefaultCommand(Help.class)

View File

@ -20,54 +20,26 @@
package io.druid.cli;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import io.druid.initialization.LogLevelAdjuster;
import java.util.List;
/**
*/
public abstract class ServerRunnable implements Runnable
public abstract class ServerRunnable extends GuiceRunnable
{
private final Logger log;
private Injector baseInjector;
public ServerRunnable(Logger log)
{
this.log = log;
super(log);
}
@Inject
public void configure(Injector injector)
{
this.baseInjector = injector;
}
protected abstract List<Object> getModules();
@Override
public void run()
{
final Injector injector = makeInjector();
final Lifecycle lifecycle = initLifecycle(injector);
try {
LogLevelAdjuster.register();
final Injector injector = Initialization.makeInjectorWithModules(
baseInjector, getModules()
);
final Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
try {
lifecycle.start();
}
catch (Throwable t) {
log.error(t, "Error when starting up. Failing.");
System.exit(1);
}
lifecycle.join();
}
catch (Exception e) {

View File

@ -91,7 +91,7 @@ public class ConvertProperties implements Runnable
new Rename("druid.indexer.maxPendingTaskDuration", "druid.indexer.autoscale.pendingTaskTimeout"),
new Rename("druid.indexer.worker.version", "druid.indexer.autoscale.workerVersion"),
new Rename("druid.indexer.worker.port", "druid.indexer.autoscale.workerPort"),
new Rename("druid.worker.masterService", "druid.worker.overlordService"),
new Rename("druid.worker.masterService", "druid.selectors.indexing.serviceName"),
new ChatHandlerConverter(),
new Rename("druid.indexer.baseDir", "druid.indexer.task.baseDir"),
new Rename("druid.indexer.taskDir", "druid.indexer.task.taskDir"),
@ -100,6 +100,7 @@ public class ConvertProperties implements Runnable
new Rename("druid.worker.taskActionClient.retry.minWaitMillis", "druid.worker.taskActionClient.retry.minWait"),
new Rename("druid.worker.taskActionClient.retry.maxWaitMillis", "druid.worker.taskActionClient.retry.maxWait"),
new Rename("druid.master.merger.service", "druid.selectors.indexing.serviceName"),
new Rename("druid.master.merger.on", "druid.master.merge.on"),
new DataSegmentPusherDefaultConverter(),
new Rename("druid.pusher.hdfs.storageDirectory", "druid.pusher.storageDirectory"),
new Rename("druid.pusher.cassandra.host", "druid.pusher.host"),

View File

@ -23,11 +23,15 @@ import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.MapBinder;
import io.druid.cli.QueryJettyServerInitializer;
import io.druid.initialization.DruidModule;
import io.druid.query.QuerySegmentWalker;
import io.druid.segment.realtime.DbSegmentPublisher;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.NoopSegmentPublisher;
import io.druid.segment.realtime.RealtimeManager;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.segment.realtime.firehose.KafkaFirehoseFactory;
@ -38,17 +42,28 @@ import java.util.Arrays;
import java.util.List;
/**
*/
*/
public class RealtimeModule implements DruidModule
{
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.publish", SegmentPublisherProvider.class);
binder.bind(SegmentPublisher.class).toProvider(SegmentPublisherProvider.class);
PolyBind.createChoice(
binder,
"druid.publish.type",
Key.get(SegmentPublisher.class),
Key.get(NoopSegmentPublisher.class)
);
final MapBinder<String, SegmentPublisher> publisherBinder = PolyBind.optionBinder(binder, Key.get(SegmentPublisher.class));
publisherBinder.addBinding("db").to(DbSegmentPublisher.class);
binder.bind(DbSegmentPublisher.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.realtime", RealtimeManagerConfig.class);
binder.bind(new TypeLiteral<List<FireDepartment>>(){})
binder.bind(
new TypeLiteral<List<FireDepartment>>()
{
}
)
.toProvider(FireDepartmentsProvider.class)
.in(LazySingleton.class);