Merge pull request #250 from metamx/guice-is

Make the indexing service work with new framework
This commit is contained in:
cheddar 2013-09-27 10:30:24 -07:00
commit 1f3aae6edf
23 changed files with 154 additions and 144 deletions

View File

@ -24,6 +24,7 @@ import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler; import com.metamx.metrics.MonitorScheduler;
import io.druid.client.ServerView; import io.druid.client.ServerView;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Processing; import io.druid.guice.annotations.Processing;
import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.config.TaskConfig;

View File

@ -132,14 +132,6 @@ public class RemoteTaskActionClient implements TaskActionClient
throw new ISE("Cannot find instance of indexer to talk to!"); throw new ISE("Cannot find instance of indexer to talk to!");
} }
return new URI( return new URI(String.format("%s://%s%s", instance.getScheme(), instance.getHost(), "/druid/indexer/v1/action"));
instance.getScheme(),
null,
instance.getHost(),
instance.getPort(),
"/druid/indexer/v1/action",
null,
null
);
} }
} }

View File

@ -125,7 +125,6 @@ public abstract class MergeTaskBase extends AbstractTask
final File taskDir = toolbox.getTaskWorkDir(); final File taskDir = toolbox.getTaskWorkDir();
try { try {
final long startTime = System.currentTimeMillis(); final long startTime = System.currentTimeMillis();
log.info( log.info(

View File

@ -55,12 +55,19 @@ public class ForkingTaskRunnerConfig
private String classpath = System.getProperty("java.class.path"); private String classpath = System.getProperty("java.class.path");
@JsonProperty @JsonProperty
@Min(1024) @Max(65535) @Min(1024)
@Max(65535)
private int startPort = 8080; private int startPort = 8080;
@JsonProperty @JsonProperty
@NotNull @NotNull
List<String> allowedPrefixes = Lists.newArrayList("com.metamx", "druid", "io.druid"); List<String> allowedPrefixes = Lists.newArrayList(
"com.metamx",
"druid",
"io.druid",
"user.timezone",
"file.encoding"
);
public int maxForks() public int maxForks()
{ {

View File

@ -19,7 +19,6 @@
package io.druid.indexing.coordinator.http; package io.druid.indexing.coordinator.http;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.collect.Collections2; import com.google.common.collect.Collections2;
@ -89,7 +88,6 @@ public class IndexerCoordinatorResource
private final TaskStorageQueryAdapter taskStorageQueryAdapter; private final TaskStorageQueryAdapter taskStorageQueryAdapter;
private final TaskLogStreamer taskLogStreamer; private final TaskLogStreamer taskLogStreamer;
private final JacksonConfigManager configManager; private final JacksonConfigManager configManager;
private final ObjectMapper jsonMapper;
private AtomicReference<WorkerSetupData> workerSetupDataRef = null; private AtomicReference<WorkerSetupData> workerSetupDataRef = null;
@ -98,21 +96,20 @@ public class IndexerCoordinatorResource
TaskMaster taskMaster, TaskMaster taskMaster,
TaskStorageQueryAdapter taskStorageQueryAdapter, TaskStorageQueryAdapter taskStorageQueryAdapter,
TaskLogStreamer taskLogStreamer, TaskLogStreamer taskLogStreamer,
JacksonConfigManager configManager, JacksonConfigManager configManager
ObjectMapper jsonMapper
) throws Exception ) throws Exception
{ {
this.taskMaster = taskMaster; this.taskMaster = taskMaster;
this.taskStorageQueryAdapter = taskStorageQueryAdapter; this.taskStorageQueryAdapter = taskStorageQueryAdapter;
this.taskLogStreamer = taskLogStreamer; this.taskLogStreamer = taskLogStreamer;
this.configManager = configManager; this.configManager = configManager;
this.jsonMapper = jsonMapper;
} }
@POST @POST
@Path("/merge") @Path("/merge")
@Consumes("application/json") @Consumes("application/json")
@Produces("application/json") @Produces("application/json")
@Deprecated
public Response doMerge(final Task task) public Response doMerge(final Task task)
{ {
// legacy endpoint // legacy endpoint
@ -123,6 +120,7 @@ public class IndexerCoordinatorResource
@Path("/index") @Path("/index")
@Consumes("application/json") @Consumes("application/json")
@Produces("application/json") @Produces("application/json")
@Deprecated
public Response doIndex(final Task task) public Response doIndex(final Task task)
{ {
return taskPost(task); return taskPost(task);

View File

@ -19,7 +19,6 @@
package io.druid.indexing.coordinator.http; package io.druid.indexing.coordinator.http;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject; import com.google.inject.Inject;
import io.druid.common.config.JacksonConfigManager; import io.druid.common.config.JacksonConfigManager;
import io.druid.indexing.common.tasklogs.TaskLogStreamer; import io.druid.indexing.common.tasklogs.TaskLogStreamer;
@ -39,10 +38,9 @@ public class OldIndexerCoordinatorResource extends IndexerCoordinatorResource
TaskMaster taskMaster, TaskMaster taskMaster,
TaskStorageQueryAdapter taskStorageQueryAdapter, TaskStorageQueryAdapter taskStorageQueryAdapter,
TaskLogStreamer taskLogStreamer, TaskLogStreamer taskLogStreamer,
JacksonConfigManager configManager, JacksonConfigManager configManager
ObjectMapper jsonMapper
) throws Exception ) throws Exception
{ {
super(taskMaster, taskStorageQueryAdapter, taskLogStreamer, configManager, jsonMapper); super(taskMaster, taskStorageQueryAdapter, taskLogStreamer, configManager);
} }
} }

View File

@ -34,6 +34,7 @@ import com.google.common.base.Supplier;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import io.druid.guice.annotations.Json;
import io.druid.indexing.coordinator.setup.EC2NodeData; import io.druid.indexing.coordinator.setup.EC2NodeData;
import io.druid.indexing.coordinator.setup.GalaxyUserData; import io.druid.indexing.coordinator.setup.GalaxyUserData;
import io.druid.indexing.coordinator.setup.WorkerSetupData; import io.druid.indexing.coordinator.setup.WorkerSetupData;
@ -55,7 +56,7 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy
@Inject @Inject
public EC2AutoScalingStrategy( public EC2AutoScalingStrategy(
ObjectMapper jsonMapper, @Json ObjectMapper jsonMapper,
AmazonEC2 amazonEC2Client, AmazonEC2 amazonEC2Client,
SimpleResourceManagementConfig config, SimpleResourceManagementConfig config,
Supplier<WorkerSetupData> workerSetupDataRef Supplier<WorkerSetupData> workerSetupDataRef

View File

@ -54,11 +54,15 @@ public class ScalingStats
public ScalingStats(int capacity) public ScalingStats(int capacity)
{ {
if (capacity == 0) {
this.recentEvents = MinMaxPriorityQueue.orderedBy(comparator).create();
} else {
this.recentEvents = MinMaxPriorityQueue this.recentEvents = MinMaxPriorityQueue
.orderedBy(comparator) .orderedBy(comparator)
.maximumSize(capacity) .maximumSize(capacity)
.create(); .create();
} }
}
public void addProvisionEvent(AutoScalingData data) public void addProvisionEvent(AutoScalingData data)
{ {

View File

@ -36,10 +36,6 @@ public class WorkerConfig
@NotNull @NotNull
private String version = null; private String version = null;
@JsonProperty
@NotNull
private String overlordService = null;
@JsonProperty @JsonProperty
@Min(1) @Min(1)
private int capacity = Runtime.getRuntime().availableProcessors() - 1; private int capacity = Runtime.getRuntime().availableProcessors() - 1;
@ -54,11 +50,6 @@ public class WorkerConfig
return version; return version;
} }
public String getOverlordService()
{
return overlordService;
}
public int getCapacity() public int getCapacity()
{ {
return capacity; return capacity;

View File

@ -123,6 +123,7 @@ public class ExecutorLifecycle
jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(taskStatus) jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(taskStatus)
); );
statusFile.getParentFile().mkdirs();
jsonMapper.writeValue(statusFile, taskStatus); jsonMapper.writeValue(statusFile, taskStatus);
return taskStatus; return taskStatus;

View File

@ -3,22 +3,22 @@
var oTable = []; var oTable = [];
$(document).ready(function() { $(document).ready(function() {
$.get('/mmx/merger/v1/runningTasks', function(data) { $.get('/druid/indexer/v1/runningTasks', function(data) {
$('.running_loading').hide(); $('.running_loading').hide();
buildTable(data, $('#runningTable'), ["segments"]); buildTable(data, $('#runningTable'), ["segments"]);
}); });
$.get('/mmx/merger/v1/pendingTasks', function(data) { $.get('/druid/indexer/v1/pendingTasks', function(data) {
$('.pending_loading').hide(); $('.pending_loading').hide();
buildTable(data, $('#pendingTable'), ["segments"]); buildTable(data, $('#pendingTable'), ["segments"]);
}); });
$.get('/mmx/merger/v1/workers', function(data) { $.get('/druid/indexer/v1/workers', function(data) {
$('.workers_loading').hide(); $('.workers_loading').hide();
buildTable(data, $('#workerTable')); buildTable(data, $('#workerTable'));
}); });
$.get('/mmx/merger/v1/scaling', function(data) { $.get('/druid/indexer/v1/scaling', function(data) {
$('.events_loading').hide(); $('.events_loading').hide();
buildTable(data, $('#eventTable')); buildTable(data, $('#eventTable'));
}); });

View File

@ -29,7 +29,7 @@ public class DruidServerConfig
{ {
@JsonProperty @JsonProperty
@Min(0) @Min(0)
private long maxSize = -1; private long maxSize = 0;
@JsonProperty @JsonProperty
private String tier = "_default_tier"; private String tier = "_default_tier";

View File

@ -72,28 +72,28 @@ public class IndexingServiceClient
} }
} }
runQuery("merge", new ClientAppendQuery(dataSource, segments)); runQuery(new ClientAppendQuery(dataSource, segments));
} }
public void killSegments(String dataSource, Interval interval) public void killSegments(String dataSource, Interval interval)
{ {
runQuery("index", new ClientKillQuery(dataSource, interval)); runQuery(new ClientKillQuery(dataSource, interval));
} }
public void upgradeSegment(DataSegment dataSegment) public void upgradeSegment(DataSegment dataSegment)
{ {
runQuery("task", new ClientConversionQuery(dataSegment)); runQuery(new ClientConversionQuery(dataSegment));
} }
public void upgradeSegments(String dataSource, Interval interval) public void upgradeSegments(String dataSource, Interval interval)
{ {
runQuery("task", new ClientConversionQuery(dataSource, interval)); runQuery(new ClientConversionQuery(dataSource, interval));
} }
private InputStream runQuery(String endpoint, Object queryObject) private InputStream runQuery(Object queryObject)
{ {
try { try {
return client.post(new URL(String.format("%s/%s", baseUrl(), endpoint))) return client.post(new URL(String.format("%s/task", baseUrl())))
.setContent("application/json", jsonMapper.writeValueAsBytes(queryObject)) .setContent("application/json", jsonMapper.writeValueAsBytes(queryObject))
.go(RESPONSE_HANDLER) .go(RESPONSE_HANDLER)
.get(); .get();
@ -111,7 +111,7 @@ public class IndexingServiceClient
throw new ISE("Cannot find instance of indexingService"); throw new ISE("Cannot find instance of indexingService");
} }
return String.format("http://%s:%s/druid/indexer/v1", instance.getHost(), instance.getPort()); return String.format("http://%s/druid/indexer/v1", instance.getHost());
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);

View File

@ -121,7 +121,7 @@ public class DiscoveryModule implements Module
public static void registerKey(Binder binder, Key<DruidNode> key) public static void registerKey(Binder binder, Key<DruidNode> key)
{ {
DruidBinders.discoveryAnnouncementBinder(binder).addBinding().toInstance(new KeyHolder<>(key)); DruidBinders.discoveryAnnouncementBinder(binder).addBinding().toInstance(new KeyHolder<>(key));
LifecycleModule.registerKey(binder, key); LifecycleModule.register(binder, ServiceAnnouncer.class);
} }
@Override @Override
@ -134,7 +134,6 @@ public class DiscoveryModule implements Module
// Build the binder so that it will at a minimum inject an empty set. // Build the binder so that it will at a minimum inject an empty set.
DruidBinders.discoveryAnnouncementBinder(binder); DruidBinders.discoveryAnnouncementBinder(binder);
// We bind this eagerly so that it gets instantiated and registers stuff with Lifecycle as a side-effect
binder.bind(ServiceAnnouncer.class) binder.bind(ServiceAnnouncer.class)
.to(Key.get(CuratorServiceAnnouncer.class, Names.named(NAME))) .to(Key.get(CuratorServiceAnnouncer.class, Names.named(NAME)))
.in(LazySingleton.class); .in(LazySingleton.class);

View File

@ -116,11 +116,9 @@ public class QueryServlet extends HttpServlet
emitter.emit( emitter.emit(
new ServiceMetricEvent.Builder() new ServiceMetricEvent.Builder()
.setUser2(query.getDataSource()) .setUser2(query.getDataSource())
//.setUser3(originatorType)
.setUser4(query.getType()) .setUser4(query.getType())
.setUser5(query.getIntervals().get(0).toString()) .setUser5(query.getIntervals().get(0).toString())
.setUser6(String.valueOf(query.hasFilters())) .setUser6(String.valueOf(query.hasFilters()))
//.setUser8(originatorId)
.setUser9(query.getDuration().toPeriod().toStandardMinutes().toString()) .setUser9(query.getDuration().toPeriod().toStandardMinutes().toString())
.build("request/time", requestTime) .build("request/time", requestTime)
); );

View File

@ -19,6 +19,8 @@
package io.druid.server.initialization; package io.druid.server.initialization;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.google.inject.Binder; import com.google.inject.Binder;
@ -28,6 +30,7 @@ import com.google.inject.Injector;
import com.google.inject.Provides; import com.google.inject.Provides;
import com.google.inject.ProvisionException; import com.google.inject.ProvisionException;
import com.google.inject.Scopes; import com.google.inject.Scopes;
import com.google.inject.Singleton;
import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.sun.jersey.api.core.DefaultResourceConfig; import com.sun.jersey.api.core.DefaultResourceConfig;
@ -39,7 +42,9 @@ import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider; import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton; import io.druid.guice.LazySingleton;
import io.druid.guice.annotations.JSR311Resource; import io.druid.guice.annotations.JSR311Resource;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Self; import io.druid.guice.annotations.Self;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.server.DruidNode; import io.druid.server.DruidNode;
import io.druid.server.StatusResource; import io.druid.server.StatusResource;
import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Connector;
@ -95,7 +100,8 @@ public class JettyServerModule extends JerseyServletModule
} }
} }
@Provides @LazySingleton @Provides
@LazySingleton
public Server getServer(Injector injector, Lifecycle lifecycle, @Self DruidNode node, ServerConfig config) public Server getServer(Injector injector, Lifecycle lifecycle, @Self DruidNode node, ServerConfig config)
{ {
JettyServerInitializer initializer = injector.getInstance(JettyServerInitializer.class); JettyServerInitializer initializer = injector.getInstance(JettyServerInitializer.class);
@ -133,6 +139,15 @@ public class JettyServerModule extends JerseyServletModule
return server; return server;
} }
@Provides
@Singleton
public JacksonJsonProvider getJacksonJsonProvider(@Json ObjectMapper objectMapper)
{
final JacksonJsonProvider provider = new JacksonJsonProvider();
provider.setMapper(objectMapper);
return provider;
}
private static Server makeJettyServer(@Self DruidNode node, ServerConfig config) private static Server makeJettyServer(@Self DruidNode node, ServerConfig config)
{ {
final QueuedThreadPool threadPool = new QueuedThreadPool(); final QueuedThreadPool threadPool = new QueuedThreadPool();

View File

@ -471,10 +471,13 @@ public class DruidMaster
serverInventoryView.start(); serverInventoryView.start();
final List<Pair<? extends MasterRunnable, Duration>> masterRunnables = Lists.newArrayList(); final List<Pair<? extends MasterRunnable, Duration>> masterRunnables = Lists.newArrayList();
segmentSettingsAtomicReference = configManager.watch(MasterSegmentSettings.CONFIG_KEY, MasterSegmentSettings.class,new MasterSegmentSettings.Builder().build()); segmentSettingsAtomicReference = configManager.watch(
MasterSegmentSettings.CONFIG_KEY,
MasterSegmentSettings.class,
new MasterSegmentSettings.Builder().build()
);
masterRunnables.add(Pair.of(new MasterComputeManagerRunnable(), config.getMasterPeriod())); masterRunnables.add(Pair.of(new MasterComputeManagerRunnable(), config.getMasterPeriod()));
if (indexingServiceClient != null) { if (indexingServiceClient != null) {
masterRunnables.add( masterRunnables.add(
Pair.of( Pair.of(
new MasterIndexingServiceRunnable( new MasterIndexingServiceRunnable(

View File

@ -42,7 +42,7 @@ public abstract class DruidMasterConfig
@Default("PT1800s") @Default("PT1800s")
public abstract Duration getMasterSegmentMergerPeriod(); public abstract Duration getMasterSegmentMergerPeriod();
@Config("druid.master.merger.on") @Config("druid.master.merge.on")
public boolean isMergeSegments() public boolean isMergeSegments()
{ {
return false; return false;

View File

@ -81,7 +81,6 @@ public class MasterSegmentSettings
return maxSegmentsToMove; return maxSegmentsToMove;
} }
public static class Builder public static class Builder
{ {
public static final String CONFIG_KEY = "master.dynamicConfigs"; public static final String CONFIG_KEY = "master.dynamicConfigs";
@ -93,14 +92,16 @@ public class MasterSegmentSettings
public Builder() public Builder()
{ {
this.millisToWaitBeforeDeleting=15 * 60 * 1000L; this(15 * 60 * 1000L, 100000000L, Integer.MAX_VALUE, 5, false);
this.mergeBytesLimit= 100000000L;
this.mergeSegmentsLimit= Integer.MAX_VALUE;
this.maxSegmentsToMove = 5;
this.emitBalancingStats = false;
} }
public Builder(long millisToWaitBeforeDeleting, long mergeBytesLimit, int mergeSegmentsLimit, int maxSegmentsToMove, boolean emitBalancingStats) public Builder(
long millisToWaitBeforeDeleting,
long mergeBytesLimit,
int mergeSegmentsLimit,
int maxSegmentsToMove,
boolean emitBalancingStats
)
{ {
this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting; this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting;
this.mergeBytesLimit = mergeBytesLimit; this.mergeBytesLimit = mergeBytesLimit;
@ -135,7 +136,13 @@ public class MasterSegmentSettings
public MasterSegmentSettings build() public MasterSegmentSettings build()
{ {
return new MasterSegmentSettings(millisToWaitBeforeDeleting,mergeBytesLimit,mergeSegmentsLimit,maxSegmentsToMove, emitBalancingStats); return new MasterSegmentSettings(
millisToWaitBeforeDeleting,
mergeBytesLimit,
mergeSegmentsLimit,
maxSegmentsToMove,
emitBalancingStats
);
} }
} }
} }

View File

@ -52,7 +52,7 @@ import java.util.List;
) )
public class CliHadoopIndexer extends GuiceRunnable public class CliHadoopIndexer extends GuiceRunnable
{ {
@Arguments(description = "A JSON object or the path to a file that contains a JSON object") @Arguments(description = "A JSON object or the path to a file that contains a JSON object", required = true)
private String argumentSpec; private String argumentSpec;
private static final Logger log = new Logger(CliHadoopIndexer.class); private static final Logger log = new Logger(CliHadoopIndexer.class);
@ -78,8 +78,6 @@ public class CliHadoopIndexer extends GuiceRunnable
@LazySingleton @LazySingleton
public HadoopDruidIndexerConfig getHadoopDruidIndexerConfig() public HadoopDruidIndexerConfig getHadoopDruidIndexerConfig()
{ {
Preconditions.checkNotNull(argumentSpec, "argumentSpec");
try { try {
if (argumentSpec.startsWith("{")) { if (argumentSpec.startsWith("{")) {
return HadoopDruidIndexerConfig.fromString(argumentSpec); return HadoopDruidIndexerConfig.fromString(argumentSpec);

View File

@ -57,6 +57,7 @@ import io.druid.indexing.coordinator.TaskRunnerFactory;
import io.druid.indexing.coordinator.TaskStorage; import io.druid.indexing.coordinator.TaskStorage;
import io.druid.indexing.coordinator.TaskStorageQueryAdapter; import io.druid.indexing.coordinator.TaskStorageQueryAdapter;
import io.druid.indexing.coordinator.http.IndexerCoordinatorResource; import io.druid.indexing.coordinator.http.IndexerCoordinatorResource;
import io.druid.indexing.coordinator.http.OldIndexerCoordinatorResource;
import io.druid.indexing.coordinator.http.OverlordRedirectInfo; import io.druid.indexing.coordinator.http.OverlordRedirectInfo;
import io.druid.indexing.coordinator.scaling.AutoScalingStrategy; import io.druid.indexing.coordinator.scaling.AutoScalingStrategy;
import io.druid.indexing.coordinator.scaling.EC2AutoScalingStrategy; import io.druid.indexing.coordinator.scaling.EC2AutoScalingStrategy;
@ -139,6 +140,7 @@ public class CliOverlord extends ServerRunnable
binder.bind(JettyServerInitializer.class).toInstance(new OverlordJettyServerInitializer()); binder.bind(JettyServerInitializer.class).toInstance(new OverlordJettyServerInitializer());
Jerseys.addResource(binder, IndexerCoordinatorResource.class); Jerseys.addResource(binder, IndexerCoordinatorResource.class);
Jerseys.addResource(binder, OldIndexerCoordinatorResource.class);
LifecycleModule.register(binder, Server.class); LifecycleModule.register(binder, Server.class);
} }

View File

@ -22,7 +22,6 @@ package io.druid.cli;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.inject.Binder; import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Injector; import com.google.inject.Injector;
import com.google.inject.Key; import com.google.inject.Key;
import com.google.inject.Module; import com.google.inject.Module;
@ -35,6 +34,7 @@ import io.airlift.command.Option;
import io.druid.guice.Jerseys; import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider; import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton; import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.ManageLifecycle; import io.druid.guice.ManageLifecycle;
import io.druid.guice.NodeTypeConfig; import io.druid.guice.NodeTypeConfig;
import io.druid.guice.PolyBind; import io.druid.guice.PolyBind;
@ -52,13 +52,13 @@ import io.druid.indexing.coordinator.ThreadPoolTaskRunner;
import io.druid.indexing.worker.executor.ChatHandlerResource; import io.druid.indexing.worker.executor.ChatHandlerResource;
import io.druid.indexing.worker.executor.ExecutorLifecycle; import io.druid.indexing.worker.executor.ExecutorLifecycle;
import io.druid.indexing.worker.executor.ExecutorLifecycleConfig; import io.druid.indexing.worker.executor.ExecutorLifecycleConfig;
import io.druid.initialization.LogLevelAdjuster;
import io.druid.query.QuerySegmentWalker; import io.druid.query.QuerySegmentWalker;
import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.S3DataSegmentKiller; import io.druid.segment.loading.S3DataSegmentKiller;
import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.StorageLocationConfig; import io.druid.segment.loading.StorageLocationConfig;
import io.druid.server.initialization.JettyServerInitializer; import io.druid.server.initialization.JettyServerInitializer;
import org.eclipse.jetty.server.Server;
import java.io.File; import java.io.File;
import java.util.Arrays; import java.util.Arrays;
@ -71,7 +71,7 @@ import java.util.List;
description = "Runs a Peon, this is an individual forked \"task\" used as part of the indexing service. " description = "Runs a Peon, this is an individual forked \"task\" used as part of the indexing service. "
+ "This should rarely, if ever, be used directly." + "This should rarely, if ever, be used directly."
) )
public class CliPeon implements Runnable public class CliPeon extends GuiceRunnable
{ {
@Arguments(description = "task.json status.json", required = true) @Arguments(description = "task.json status.json", required = true)
public List<String> taskAndStatusFile; public List<String> taskAndStatusFile;
@ -79,21 +79,17 @@ public class CliPeon implements Runnable
@Option(name = "--nodeType", title = "nodeType", description = "Set the node type to expose on ZK") @Option(name = "--nodeType", title = "nodeType", description = "Set the node type to expose on ZK")
public String nodeType = "indexer-executor"; public String nodeType = "indexer-executor";
private Injector injector;
@Inject
public void configure(Injector injector)
{
this.injector = injector;
}
private static final Logger log = new Logger(CliPeon.class); private static final Logger log = new Logger(CliPeon.class);
protected Injector getInjector() public CliPeon()
{ {
return Initialization.makeInjectorWithModules( super(log);
injector, }
ImmutableList.<Object>of(
@Override
protected List<Object> getModules()
{
return ImmutableList.<Object>of(
new Module() new Module()
{ {
@Override @Override
@ -144,9 +140,10 @@ public class CliPeon implements Runnable
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class); binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class);
Jerseys.addResource(binder, ChatHandlerResource.class); Jerseys.addResource(binder, ChatHandlerResource.class);
binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig(nodeType)); binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig(nodeType));
LifecycleModule.register(binder, Server.class);
} }
} }
)
); );
} }
@ -154,13 +151,11 @@ public class CliPeon implements Runnable
public void run() public void run()
{ {
try { try {
LogLevelAdjuster.register(); Injector injector = makeInjector();
final Injector injector = getInjector();
final Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
try { try {
lifecycle.start(); Lifecycle lifecycle = initLifecycle(injector);
injector.getInstance(ExecutorLifecycle.class).join(); injector.getInstance(ExecutorLifecycle.class).join();
lifecycle.stop(); lifecycle.stop();
} }

View File

@ -91,7 +91,7 @@ public class ConvertProperties implements Runnable
new Rename("druid.indexer.maxPendingTaskDuration", "druid.indexer.autoscale.pendingTaskTimeout"), new Rename("druid.indexer.maxPendingTaskDuration", "druid.indexer.autoscale.pendingTaskTimeout"),
new Rename("druid.indexer.worker.version", "druid.indexer.autoscale.workerVersion"), new Rename("druid.indexer.worker.version", "druid.indexer.autoscale.workerVersion"),
new Rename("druid.indexer.worker.port", "druid.indexer.autoscale.workerPort"), new Rename("druid.indexer.worker.port", "druid.indexer.autoscale.workerPort"),
new Rename("druid.worker.masterService", "druid.worker.overlordService"), new Rename("druid.worker.masterService", "druid.selectors.indexing.serviceName"),
new ChatHandlerConverter(), new ChatHandlerConverter(),
new Rename("druid.indexer.baseDir", "druid.indexer.task.baseDir"), new Rename("druid.indexer.baseDir", "druid.indexer.task.baseDir"),
new Rename("druid.indexer.taskDir", "druid.indexer.task.taskDir"), new Rename("druid.indexer.taskDir", "druid.indexer.task.taskDir"),
@ -100,6 +100,7 @@ public class ConvertProperties implements Runnable
new Rename("druid.worker.taskActionClient.retry.minWaitMillis", "druid.worker.taskActionClient.retry.minWait"), new Rename("druid.worker.taskActionClient.retry.minWaitMillis", "druid.worker.taskActionClient.retry.minWait"),
new Rename("druid.worker.taskActionClient.retry.maxWaitMillis", "druid.worker.taskActionClient.retry.maxWait"), new Rename("druid.worker.taskActionClient.retry.maxWaitMillis", "druid.worker.taskActionClient.retry.maxWait"),
new Rename("druid.master.merger.service", "druid.selectors.indexing.serviceName"), new Rename("druid.master.merger.service", "druid.selectors.indexing.serviceName"),
new Rename("druid.master.merger.on", "druid.master.merge.on"),
new DataSegmentPusherDefaultConverter(), new DataSegmentPusherDefaultConverter(),
new Rename("druid.pusher.hdfs.storageDirectory", "druid.pusher.storageDirectory"), new Rename("druid.pusher.hdfs.storageDirectory", "druid.pusher.storageDirectory"),
new Rename("druid.pusher.cassandra.host", "druid.pusher.host"), new Rename("druid.pusher.cassandra.host", "druid.pusher.host"),