From e404295c1f921dac31c2c9c0855204e8b402c85b Mon Sep 17 00:00:00 2001 From: fjy Date: Thu, 26 Sep 2013 17:44:21 -0700 Subject: [PATCH 1/2] make indexing service work --- .../indexing/common/TaskToolboxFactory.java | 3 +- .../actions/RemoteTaskActionClient.java | 10 +- .../indexing/common/task/MergeTaskBase.java | 1 - .../config/ForkingTaskRunnerConfig.java | 11 +- .../http/IndexerCoordinatorResource.java | 8 +- .../http/OldIndexerCoordinatorResource.java | 6 +- .../scaling/EC2AutoScalingStrategy.java | 3 +- .../coordinator/scaling/ScalingStats.java | 12 +- .../indexing/worker/config/WorkerConfig.java | 9 -- .../worker/executor/ExecutorLifecycle.java | 1 + .../indexer_static/js/console-0.0.1.js | 8 +- .../io/druid/client/DruidServerConfig.java | 2 +- .../indexing/IndexingServiceClient.java | 14 +- .../curator/discovery/DiscoveryModule.java | 3 +- .../java/io/druid/server/QueryServlet.java | 2 - .../initialization/JettyServerModule.java | 17 ++- .../io/druid/server/master/DruidMaster.java | 9 +- .../server/master/DruidMasterConfig.java | 2 +- .../server/master/MasterSegmentSettings.java | 45 ++++--- .../java/io/druid/cli/CliHadoopIndexer.java | 4 +- .../main/java/io/druid/cli/CliOverlord.java | 2 + .../src/main/java/io/druid/cli/CliPeon.java | 124 +++++++++--------- .../druid/cli/convert/ConvertProperties.java | 3 +- 23 files changed, 154 insertions(+), 145 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java index d20480b9de0..33e2a367c41 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java @@ -24,6 +24,7 @@ import com.google.inject.Inject; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.metrics.MonitorScheduler; import io.druid.client.ServerView; +import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Processing; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; @@ -67,7 +68,7 @@ public class TaskToolboxFactory @Processing ExecutorService queryExecutorService, MonitorScheduler monitorScheduler, SegmentLoaderFactory segmentLoaderFactory, - ObjectMapper objectMapper + @Json ObjectMapper objectMapper ) { this.config = config; diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java index 41639aa9989..a72ece1ae80 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java @@ -132,14 +132,6 @@ public class RemoteTaskActionClient implements TaskActionClient throw new ISE("Cannot find instance of indexer to talk to!"); } - return new URI( - instance.getScheme(), - null, - instance.getHost(), - instance.getPort(), - "/druid/indexer/v1/action", - null, - null - ); + return new URI(String.format("%s://%s%s", instance.getScheme(), instance.getHost(), "/druid/indexer/v1/action")); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java index e597815a3d4..5936d45a278 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java @@ -125,7 +125,6 @@ public abstract class MergeTaskBase extends AbstractTask final File taskDir = toolbox.getTaskWorkDir(); try { - final long startTime = System.currentTimeMillis(); log.info( diff --git a/indexing-service/src/main/java/io/druid/indexing/coordinator/config/ForkingTaskRunnerConfig.java b/indexing-service/src/main/java/io/druid/indexing/coordinator/config/ForkingTaskRunnerConfig.java index 930c6167f16..d48b6b212f6 100644 --- a/indexing-service/src/main/java/io/druid/indexing/coordinator/config/ForkingTaskRunnerConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/coordinator/config/ForkingTaskRunnerConfig.java @@ -55,12 +55,19 @@ public class ForkingTaskRunnerConfig private String classpath = System.getProperty("java.class.path"); @JsonProperty - @Min(1024) @Max(65535) + @Min(1024) + @Max(65535) private int startPort = 8080; @JsonProperty @NotNull - List allowedPrefixes = Lists.newArrayList("com.metamx", "druid", "io.druid"); + List allowedPrefixes = Lists.newArrayList( + "com.metamx", + "druid", + "io.druid", + "user.timezone", + "file.encoding" + ); public int maxForks() { diff --git a/indexing-service/src/main/java/io/druid/indexing/coordinator/http/IndexerCoordinatorResource.java b/indexing-service/src/main/java/io/druid/indexing/coordinator/http/IndexerCoordinatorResource.java index 090997035b5..9a2638e8d67 100644 --- a/indexing-service/src/main/java/io/druid/indexing/coordinator/http/IndexerCoordinatorResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/coordinator/http/IndexerCoordinatorResource.java @@ -19,7 +19,6 @@ package io.druid.indexing.coordinator.http; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.collect.Collections2; @@ -89,7 +88,6 @@ public class IndexerCoordinatorResource private final TaskStorageQueryAdapter taskStorageQueryAdapter; private final TaskLogStreamer taskLogStreamer; private final JacksonConfigManager configManager; - private final ObjectMapper jsonMapper; private AtomicReference workerSetupDataRef = null; @@ -98,21 +96,20 @@ public class IndexerCoordinatorResource TaskMaster taskMaster, TaskStorageQueryAdapter taskStorageQueryAdapter, TaskLogStreamer taskLogStreamer, - JacksonConfigManager configManager, - ObjectMapper jsonMapper + JacksonConfigManager configManager ) throws Exception { this.taskMaster = taskMaster; this.taskStorageQueryAdapter = taskStorageQueryAdapter; this.taskLogStreamer = taskLogStreamer; this.configManager = configManager; - this.jsonMapper = jsonMapper; } @POST @Path("/merge") @Consumes("application/json") @Produces("application/json") + @Deprecated public Response doMerge(final Task task) { // legacy endpoint @@ -123,6 +120,7 @@ public class IndexerCoordinatorResource @Path("/index") @Consumes("application/json") @Produces("application/json") + @Deprecated public Response doIndex(final Task task) { return taskPost(task); diff --git a/indexing-service/src/main/java/io/druid/indexing/coordinator/http/OldIndexerCoordinatorResource.java b/indexing-service/src/main/java/io/druid/indexing/coordinator/http/OldIndexerCoordinatorResource.java index 11df4499edf..b8a6f679df8 100644 --- a/indexing-service/src/main/java/io/druid/indexing/coordinator/http/OldIndexerCoordinatorResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/coordinator/http/OldIndexerCoordinatorResource.java @@ -19,7 +19,6 @@ package io.druid.indexing.coordinator.http; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import io.druid.common.config.JacksonConfigManager; import io.druid.indexing.common.tasklogs.TaskLogStreamer; @@ -39,10 +38,9 @@ public class OldIndexerCoordinatorResource extends IndexerCoordinatorResource TaskMaster taskMaster, TaskStorageQueryAdapter taskStorageQueryAdapter, TaskLogStreamer taskLogStreamer, - JacksonConfigManager configManager, - ObjectMapper jsonMapper + JacksonConfigManager configManager ) throws Exception { - super(taskMaster, taskStorageQueryAdapter, taskLogStreamer, configManager, jsonMapper); + super(taskMaster, taskStorageQueryAdapter, taskLogStreamer, configManager); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/coordinator/scaling/EC2AutoScalingStrategy.java b/indexing-service/src/main/java/io/druid/indexing/coordinator/scaling/EC2AutoScalingStrategy.java index cda0da63568..8933fb16041 100644 --- a/indexing-service/src/main/java/io/druid/indexing/coordinator/scaling/EC2AutoScalingStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/coordinator/scaling/EC2AutoScalingStrategy.java @@ -34,6 +34,7 @@ import com.google.common.base.Supplier; import com.google.common.collect.Lists; import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; +import io.druid.guice.annotations.Json; import io.druid.indexing.coordinator.setup.EC2NodeData; import io.druid.indexing.coordinator.setup.GalaxyUserData; import io.druid.indexing.coordinator.setup.WorkerSetupData; @@ -55,7 +56,7 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy @Inject public EC2AutoScalingStrategy( - ObjectMapper jsonMapper, + @Json ObjectMapper jsonMapper, AmazonEC2 amazonEC2Client, SimpleResourceManagementConfig config, Supplier workerSetupDataRef diff --git a/indexing-service/src/main/java/io/druid/indexing/coordinator/scaling/ScalingStats.java b/indexing-service/src/main/java/io/druid/indexing/coordinator/scaling/ScalingStats.java index 7e5e1c23fe1..7af8de7b6ab 100644 --- a/indexing-service/src/main/java/io/druid/indexing/coordinator/scaling/ScalingStats.java +++ b/indexing-service/src/main/java/io/druid/indexing/coordinator/scaling/ScalingStats.java @@ -54,10 +54,14 @@ public class ScalingStats public ScalingStats(int capacity) { - this.recentEvents = MinMaxPriorityQueue - .orderedBy(comparator) - .maximumSize(capacity) - .create(); + if (capacity == 0) { + this.recentEvents = MinMaxPriorityQueue.orderedBy(comparator).create(); + } else { + this.recentEvents = MinMaxPriorityQueue + .orderedBy(comparator) + .maximumSize(capacity) + .create(); + } } public void addProvisionEvent(AutoScalingData data) diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/config/WorkerConfig.java b/indexing-service/src/main/java/io/druid/indexing/worker/config/WorkerConfig.java index f1e1bf4d32f..81f41d43393 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/config/WorkerConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/config/WorkerConfig.java @@ -36,10 +36,6 @@ public class WorkerConfig @NotNull private String version = null; - @JsonProperty - @NotNull - private String overlordService = null; - @JsonProperty @Min(1) private int capacity = Runtime.getRuntime().availableProcessors() - 1; @@ -54,11 +50,6 @@ public class WorkerConfig return version; } - public String getOverlordService() - { - return overlordService; - } - public int getCapacity() { return capacity; diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java b/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java index 8f0fac4bfb8..5c3d15e9d2d 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java @@ -123,6 +123,7 @@ public class ExecutorLifecycle jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(taskStatus) ); + statusFile.getParentFile().mkdirs(); jsonMapper.writeValue(statusFile, taskStatus); return taskStatus; diff --git a/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js b/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js index b48dbf1dde4..e3ce86c85c9 100644 --- a/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js +++ b/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js @@ -3,22 +3,22 @@ var oTable = []; $(document).ready(function() { - $.get('/mmx/merger/v1/runningTasks', function(data) { + $.get('/druid/indexer/v1/runningTasks', function(data) { $('.running_loading').hide(); buildTable(data, $('#runningTable'), ["segments"]); }); - $.get('/mmx/merger/v1/pendingTasks', function(data) { + $.get('/druid/indexer/v1/pendingTasks', function(data) { $('.pending_loading').hide(); buildTable(data, $('#pendingTable'), ["segments"]); }); - $.get('/mmx/merger/v1/workers', function(data) { + $.get('/druid/indexer/v1/workers', function(data) { $('.workers_loading').hide(); buildTable(data, $('#workerTable')); }); - $.get('/mmx/merger/v1/scaling', function(data) { + $.get('/druid/indexer/v1/scaling', function(data) { $('.events_loading').hide(); buildTable(data, $('#eventTable')); }); diff --git a/server/src/main/java/io/druid/client/DruidServerConfig.java b/server/src/main/java/io/druid/client/DruidServerConfig.java index 14cfaee290a..089c05ff021 100644 --- a/server/src/main/java/io/druid/client/DruidServerConfig.java +++ b/server/src/main/java/io/druid/client/DruidServerConfig.java @@ -29,7 +29,7 @@ public class DruidServerConfig { @JsonProperty @Min(0) - private long maxSize = -1; + private long maxSize = 0; @JsonProperty private String tier = "_default_tier"; diff --git a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java index abae59ee168..c345bdb2669 100644 --- a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java +++ b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java @@ -72,28 +72,28 @@ public class IndexingServiceClient } } - runQuery("merge", new ClientAppendQuery(dataSource, segments)); + runQuery(new ClientAppendQuery(dataSource, segments)); } public void killSegments(String dataSource, Interval interval) { - runQuery("index", new ClientKillQuery(dataSource, interval)); + runQuery(new ClientKillQuery(dataSource, interval)); } public void upgradeSegment(DataSegment dataSegment) { - runQuery("task", new ClientConversionQuery(dataSegment)); + runQuery(new ClientConversionQuery(dataSegment)); } public void upgradeSegments(String dataSource, Interval interval) { - runQuery("task", new ClientConversionQuery(dataSource, interval)); + runQuery(new ClientConversionQuery(dataSource, interval)); } - private InputStream runQuery(String endpoint, Object queryObject) + private InputStream runQuery(Object queryObject) { try { - return client.post(new URL(String.format("%s/%s", baseUrl(), endpoint))) + return client.post(new URL(String.format("%s/task", baseUrl()))) .setContent("application/json", jsonMapper.writeValueAsBytes(queryObject)) .go(RESPONSE_HANDLER) .get(); @@ -111,7 +111,7 @@ public class IndexingServiceClient throw new ISE("Cannot find instance of indexingService"); } - return String.format("http://%s:%s/druid/indexer/v1", instance.getHost(), instance.getPort()); + return String.format("http://%s/druid/indexer/v1", instance.getHost()); } catch (Exception e) { throw Throwables.propagate(e); diff --git a/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java b/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java index 3c651978129..36e13f2e144 100644 --- a/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java +++ b/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java @@ -121,7 +121,7 @@ public class DiscoveryModule implements Module public static void registerKey(Binder binder, Key key) { DruidBinders.discoveryAnnouncementBinder(binder).addBinding().toInstance(new KeyHolder<>(key)); - LifecycleModule.registerKey(binder, key); + LifecycleModule.register(binder, ServiceAnnouncer.class); } @Override @@ -134,7 +134,6 @@ public class DiscoveryModule implements Module // Build the binder so that it will at a minimum inject an empty set. DruidBinders.discoveryAnnouncementBinder(binder); - // We bind this eagerly so that it gets instantiated and registers stuff with Lifecycle as a side-effect binder.bind(ServiceAnnouncer.class) .to(Key.get(CuratorServiceAnnouncer.class, Names.named(NAME))) .in(LazySingleton.class); diff --git a/server/src/main/java/io/druid/server/QueryServlet.java b/server/src/main/java/io/druid/server/QueryServlet.java index 49d32ebe0dd..53a5b17a260 100644 --- a/server/src/main/java/io/druid/server/QueryServlet.java +++ b/server/src/main/java/io/druid/server/QueryServlet.java @@ -116,11 +116,9 @@ public class QueryServlet extends HttpServlet emitter.emit( new ServiceMetricEvent.Builder() .setUser2(query.getDataSource()) - //.setUser3(originatorType) .setUser4(query.getType()) .setUser5(query.getIntervals().get(0).toString()) .setUser6(String.valueOf(query.hasFilters())) - //.setUser8(originatorId) .setUser9(query.getDuration().toPeriod().toStandardMinutes().toString()) .build("request/time", requestTime) ); diff --git a/server/src/main/java/io/druid/server/initialization/JettyServerModule.java b/server/src/main/java/io/druid/server/initialization/JettyServerModule.java index d7e16354c22..246645dac0c 100644 --- a/server/src/main/java/io/druid/server/initialization/JettyServerModule.java +++ b/server/src/main/java/io/druid/server/initialization/JettyServerModule.java @@ -19,6 +19,8 @@ package io.druid.server.initialization; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; import com.google.common.collect.Iterables; import com.google.common.primitives.Ints; import com.google.inject.Binder; @@ -28,6 +30,7 @@ import com.google.inject.Injector; import com.google.inject.Provides; import com.google.inject.ProvisionException; import com.google.inject.Scopes; +import com.google.inject.Singleton; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; import com.sun.jersey.api.core.DefaultResourceConfig; @@ -39,7 +42,9 @@ import io.druid.guice.Jerseys; import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; import io.druid.guice.annotations.JSR311Resource; +import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Self; +import io.druid.jackson.DefaultObjectMapper; import io.druid.server.DruidNode; import io.druid.server.StatusResource; import org.eclipse.jetty.server.Connector; @@ -95,7 +100,8 @@ public class JettyServerModule extends JerseyServletModule } } - @Provides @LazySingleton + @Provides + @LazySingleton public Server getServer(Injector injector, Lifecycle lifecycle, @Self DruidNode node, ServerConfig config) { JettyServerInitializer initializer = injector.getInstance(JettyServerInitializer.class); @@ -133,6 +139,15 @@ public class JettyServerModule extends JerseyServletModule return server; } + @Provides + @Singleton + public JacksonJsonProvider getJacksonJsonProvider(@Json ObjectMapper objectMapper) + { + final JacksonJsonProvider provider = new JacksonJsonProvider(); + provider.setMapper(objectMapper); + return provider; + } + private static Server makeJettyServer(@Self DruidNode node, ServerConfig config) { final QueuedThreadPool threadPool = new QueuedThreadPool(); diff --git a/server/src/main/java/io/druid/server/master/DruidMaster.java b/server/src/main/java/io/druid/server/master/DruidMaster.java index 937c1c6bfa1..0c1a5d5419e 100644 --- a/server/src/main/java/io/druid/server/master/DruidMaster.java +++ b/server/src/main/java/io/druid/server/master/DruidMaster.java @@ -161,7 +161,7 @@ public class DruidMaster this.exec = scheduledExecutorFactory.create(1, "Master-Exec--%d"); this.leaderLatch = new AtomicReference<>(null); - this.segmentSettingsAtomicReference= new AtomicReference<>(null); + this.segmentSettingsAtomicReference = new AtomicReference<>(null); this.loadManagementPeons = loadQueuePeonMap; } @@ -471,10 +471,13 @@ public class DruidMaster serverInventoryView.start(); final List> masterRunnables = Lists.newArrayList(); - segmentSettingsAtomicReference = configManager.watch(MasterSegmentSettings.CONFIG_KEY, MasterSegmentSettings.class,new MasterSegmentSettings.Builder().build()); + segmentSettingsAtomicReference = configManager.watch( + MasterSegmentSettings.CONFIG_KEY, + MasterSegmentSettings.class, + new MasterSegmentSettings.Builder().build() + ); masterRunnables.add(Pair.of(new MasterComputeManagerRunnable(), config.getMasterPeriod())); if (indexingServiceClient != null) { - masterRunnables.add( Pair.of( new MasterIndexingServiceRunnable( diff --git a/server/src/main/java/io/druid/server/master/DruidMasterConfig.java b/server/src/main/java/io/druid/server/master/DruidMasterConfig.java index d279b91d5c1..49594850a91 100644 --- a/server/src/main/java/io/druid/server/master/DruidMasterConfig.java +++ b/server/src/main/java/io/druid/server/master/DruidMasterConfig.java @@ -42,7 +42,7 @@ public abstract class DruidMasterConfig @Default("PT1800s") public abstract Duration getMasterSegmentMergerPeriod(); - @Config("druid.master.merger.on") + @Config("druid.master.merge.on") public boolean isMergeSegments() { return false; diff --git a/server/src/main/java/io/druid/server/master/MasterSegmentSettings.java b/server/src/main/java/io/druid/server/master/MasterSegmentSettings.java index a31f9b8dce8..68c733e3c0a 100644 --- a/server/src/main/java/io/druid/server/master/MasterSegmentSettings.java +++ b/server/src/main/java/io/druid/server/master/MasterSegmentSettings.java @@ -24,8 +24,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; public class MasterSegmentSettings { public static final String CONFIG_KEY = "master.dynamicConfigs"; - private long millisToWaitBeforeDeleting=15 * 60 * 1000L; - private long mergeBytesLimit= 100000000L; + private long millisToWaitBeforeDeleting = 15 * 60 * 1000L; + private long mergeBytesLimit = 100000000L; private int mergeSegmentsLimit = Integer.MAX_VALUE; private int maxSegmentsToMove = 5; private boolean emitBalancingStats = false; @@ -39,11 +39,11 @@ public class MasterSegmentSettings @JsonProperty("emitBalancingStats") Boolean emitBalancingStats ) { - this.maxSegmentsToMove=maxSegmentsToMove; - this.millisToWaitBeforeDeleting=millisToWaitBeforeDeleting; - this.mergeSegmentsLimit=mergeSegmentsLimit; - this.mergeBytesLimit=mergeBytesLimit; - this.emitBalancingStats = emitBalancingStats; + this.maxSegmentsToMove = maxSegmentsToMove; + this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting; + this.mergeSegmentsLimit = mergeSegmentsLimit; + this.mergeBytesLimit = mergeBytesLimit; + this.emitBalancingStats = emitBalancingStats; } public static String getConfigKey() @@ -81,7 +81,6 @@ public class MasterSegmentSettings return maxSegmentsToMove; } - public static class Builder { public static final String CONFIG_KEY = "master.dynamicConfigs"; @@ -93,14 +92,16 @@ public class MasterSegmentSettings public Builder() { - this.millisToWaitBeforeDeleting=15 * 60 * 1000L; - this.mergeBytesLimit= 100000000L; - this.mergeSegmentsLimit= Integer.MAX_VALUE; - this.maxSegmentsToMove = 5; - this.emitBalancingStats = false; + this(15 * 60 * 1000L, 100000000L, Integer.MAX_VALUE, 5, false); } - public Builder(long millisToWaitBeforeDeleting, long mergeBytesLimit, int mergeSegmentsLimit, int maxSegmentsToMove, boolean emitBalancingStats) + public Builder( + long millisToWaitBeforeDeleting, + long mergeBytesLimit, + int mergeSegmentsLimit, + int maxSegmentsToMove, + boolean emitBalancingStats + ) { this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting; this.mergeBytesLimit = mergeBytesLimit; @@ -111,31 +112,37 @@ public class MasterSegmentSettings public Builder withMillisToWaitBeforeDeleting(long millisToWaitBeforeDeleting) { - this.millisToWaitBeforeDeleting=millisToWaitBeforeDeleting; + this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting; return this; } public Builder withMergeBytesLimit(long mergeBytesLimit) { - this.mergeBytesLimit=mergeBytesLimit; + this.mergeBytesLimit = mergeBytesLimit; return this; } public Builder withMergeSegmentsLimit(int mergeSegmentsLimit) { - this.mergeSegmentsLimit=mergeSegmentsLimit; + this.mergeSegmentsLimit = mergeSegmentsLimit; return this; } public Builder withMaxSegmentsToMove(int maxSegmentsToMove) { - this.maxSegmentsToMove=maxSegmentsToMove; + this.maxSegmentsToMove = maxSegmentsToMove; return this; } public MasterSegmentSettings build() { - return new MasterSegmentSettings(millisToWaitBeforeDeleting,mergeBytesLimit,mergeSegmentsLimit,maxSegmentsToMove, emitBalancingStats); + return new MasterSegmentSettings( + millisToWaitBeforeDeleting, + mergeBytesLimit, + mergeSegmentsLimit, + maxSegmentsToMove, + emitBalancingStats + ); } } } diff --git a/services/src/main/java/io/druid/cli/CliHadoopIndexer.java b/services/src/main/java/io/druid/cli/CliHadoopIndexer.java index 8024907bbee..ec5db65ec97 100644 --- a/services/src/main/java/io/druid/cli/CliHadoopIndexer.java +++ b/services/src/main/java/io/druid/cli/CliHadoopIndexer.java @@ -52,7 +52,7 @@ import java.util.List; ) public class CliHadoopIndexer extends GuiceRunnable { - @Arguments(description = "A JSON object or the path to a file that contains a JSON object") + @Arguments(description = "A JSON object or the path to a file that contains a JSON object", required = true) private String argumentSpec; private static final Logger log = new Logger(CliHadoopIndexer.class); @@ -78,8 +78,6 @@ public class CliHadoopIndexer extends GuiceRunnable @LazySingleton public HadoopDruidIndexerConfig getHadoopDruidIndexerConfig() { - Preconditions.checkNotNull(argumentSpec, "argumentSpec"); - try { if (argumentSpec.startsWith("{")) { return HadoopDruidIndexerConfig.fromString(argumentSpec); diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java index a4d34bb6b0c..446283e0dee 100644 --- a/services/src/main/java/io/druid/cli/CliOverlord.java +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -57,6 +57,7 @@ import io.druid.indexing.coordinator.TaskRunnerFactory; import io.druid.indexing.coordinator.TaskStorage; import io.druid.indexing.coordinator.TaskStorageQueryAdapter; import io.druid.indexing.coordinator.http.IndexerCoordinatorResource; +import io.druid.indexing.coordinator.http.OldIndexerCoordinatorResource; import io.druid.indexing.coordinator.http.OverlordRedirectInfo; import io.druid.indexing.coordinator.scaling.AutoScalingStrategy; import io.druid.indexing.coordinator.scaling.EC2AutoScalingStrategy; @@ -139,6 +140,7 @@ public class CliOverlord extends ServerRunnable binder.bind(JettyServerInitializer.class).toInstance(new OverlordJettyServerInitializer()); Jerseys.addResource(binder, IndexerCoordinatorResource.class); + Jerseys.addResource(binder, OldIndexerCoordinatorResource.class); LifecycleModule.register(binder, Server.class); } diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index 1d5e1035232..f130f190950 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -22,7 +22,6 @@ package io.druid.cli; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; -import com.google.inject.Inject; import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Module; @@ -35,6 +34,7 @@ import io.airlift.command.Option; import io.druid.guice.Jerseys; import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; +import io.druid.guice.LifecycleModule; import io.druid.guice.ManageLifecycle; import io.druid.guice.NodeTypeConfig; import io.druid.guice.PolyBind; @@ -52,13 +52,13 @@ import io.druid.indexing.coordinator.ThreadPoolTaskRunner; import io.druid.indexing.worker.executor.ChatHandlerResource; import io.druid.indexing.worker.executor.ExecutorLifecycle; import io.druid.indexing.worker.executor.ExecutorLifecycleConfig; -import io.druid.initialization.LogLevelAdjuster; import io.druid.query.QuerySegmentWalker; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.S3DataSegmentKiller; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.StorageLocationConfig; import io.druid.server.initialization.JettyServerInitializer; +import org.eclipse.jetty.server.Server; import java.io.File; import java.util.Arrays; @@ -71,7 +71,7 @@ import java.util.List; description = "Runs a Peon, this is an individual forked \"task\" used as part of the indexing service. " + "This should rarely, if ever, be used directly." ) -public class CliPeon implements Runnable +public class CliPeon extends GuiceRunnable { @Arguments(description = "task.json status.json", required = true) public List taskAndStatusFile; @@ -79,74 +79,71 @@ public class CliPeon implements Runnable @Option(name = "--nodeType", title = "nodeType", description = "Set the node type to expose on ZK") public String nodeType = "indexer-executor"; - private Injector injector; - - @Inject - public void configure(Injector injector) - { - this.injector = injector; - } - private static final Logger log = new Logger(CliPeon.class); - protected Injector getInjector() + public CliPeon() { - return Initialization.makeInjectorWithModules( - injector, - ImmutableList.of( - new Module() - { - @Override - public void configure(Binder binder) - { - PolyBind.createChoice( - binder, - "druid.indexer.task.chathandler.type", - Key.get(ChatHandlerProvider.class), - Key.get(NoopChatHandlerProvider.class) - ); - final MapBinder handlerProviderBinder = PolyBind.optionBinder( - binder, Key.get(ChatHandlerProvider.class) - ); - handlerProviderBinder.addBinding("curator").to(EventReceivingChatHandlerProvider.class); - handlerProviderBinder.addBinding("noop").to(NoopChatHandlerProvider.class); + super(log); + } - binder.bind(TaskToolboxFactory.class).in(LazySingleton.class); + @Override + protected List getModules() + { + return ImmutableList.of( + new Module() + { + @Override + public void configure(Binder binder) + { + PolyBind.createChoice( + binder, + "druid.indexer.task.chathandler.type", + Key.get(ChatHandlerProvider.class), + Key.get(NoopChatHandlerProvider.class) + ); + final MapBinder handlerProviderBinder = PolyBind.optionBinder( + binder, Key.get(ChatHandlerProvider.class) + ); + handlerProviderBinder.addBinding("curator").to(EventReceivingChatHandlerProvider.class); + handlerProviderBinder.addBinding("noop").to(NoopChatHandlerProvider.class); - JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class); - JsonConfigProvider.bind(binder, "druid.worker.taskActionClient.retry", RetryPolicyConfig.class); + binder.bind(TaskToolboxFactory.class).in(LazySingleton.class); - binder.bind(TaskActionClientFactory.class) - .to(RemoteTaskActionClientFactory.class) - .in(LazySingleton.class); - binder.bind(RetryPolicyFactory.class).in(LazySingleton.class); + JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class); + JsonConfigProvider.bind(binder, "druid.worker.taskActionClient.retry", RetryPolicyConfig.class); - binder.bind(DataSegmentKiller.class).to(S3DataSegmentKiller.class).in(LazySingleton.class); + binder.bind(TaskActionClientFactory.class) + .to(RemoteTaskActionClientFactory.class) + .in(LazySingleton.class); + binder.bind(RetryPolicyFactory.class).in(LazySingleton.class); - binder.bind(ExecutorLifecycle.class).in(ManageLifecycle.class); - binder.bind(ExecutorLifecycleConfig.class).toInstance( - new ExecutorLifecycleConfig() - .setTaskFile(new File(taskAndStatusFile.get(0))) - .setStatusFile(new File(taskAndStatusFile.get(1))) - ); + binder.bind(DataSegmentKiller.class).to(S3DataSegmentKiller.class).in(LazySingleton.class); - binder.bind(TaskRunner.class).to(ThreadPoolTaskRunner.class); - binder.bind(QuerySegmentWalker.class).to(ThreadPoolTaskRunner.class); - binder.bind(ThreadPoolTaskRunner.class).in(ManageLifecycle.class); + binder.bind(ExecutorLifecycle.class).in(ManageLifecycle.class); + binder.bind(ExecutorLifecycleConfig.class).toInstance( + new ExecutorLifecycleConfig() + .setTaskFile(new File(taskAndStatusFile.get(0))) + .setStatusFile(new File(taskAndStatusFile.get(1))) + ); - // Override the default SegmentLoaderConfig because we don't actually care about the - // configuration based locations. This will override them anyway. This is also stopping - // configuration of other parameters, but I don't think that's actually a problem. - // Note, if that is actually not a problem, then that probably means we have the wrong abstraction. - binder.bind(SegmentLoaderConfig.class) - .toInstance(new SegmentLoaderConfig().withLocations(Arrays.asList())); + binder.bind(TaskRunner.class).to(ThreadPoolTaskRunner.class); + binder.bind(QuerySegmentWalker.class).to(ThreadPoolTaskRunner.class); + binder.bind(ThreadPoolTaskRunner.class).in(ManageLifecycle.class); - binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class); - Jerseys.addResource(binder, ChatHandlerResource.class); - binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig(nodeType)); - } - } - ) + // Override the default SegmentLoaderConfig because we don't actually care about the + // configuration based locations. This will override them anyway. This is also stopping + // configuration of other parameters, but I don't think that's actually a problem. + // Note, if that is actually not a problem, then that probably means we have the wrong abstraction. + binder.bind(SegmentLoaderConfig.class) + .toInstance(new SegmentLoaderConfig().withLocations(Arrays.asList())); + + binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class); + Jerseys.addResource(binder, ChatHandlerResource.class); + binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig(nodeType)); + + LifecycleModule.register(binder, Server.class); + } + } ); } @@ -154,13 +151,10 @@ public class CliPeon implements Runnable public void run() { try { - LogLevelAdjuster.register(); - - final Injector injector = getInjector(); - final Lifecycle lifecycle = injector.getInstance(Lifecycle.class); + Injector injector = makeInjector(); + Lifecycle lifecycle = initLifecycle(injector); try { - lifecycle.start(); injector.getInstance(ExecutorLifecycle.class).join(); lifecycle.stop(); } diff --git a/services/src/main/java/io/druid/cli/convert/ConvertProperties.java b/services/src/main/java/io/druid/cli/convert/ConvertProperties.java index a5d5944554d..7c97b4a8012 100644 --- a/services/src/main/java/io/druid/cli/convert/ConvertProperties.java +++ b/services/src/main/java/io/druid/cli/convert/ConvertProperties.java @@ -91,7 +91,7 @@ public class ConvertProperties implements Runnable new Rename("druid.indexer.maxPendingTaskDuration", "druid.indexer.autoscale.pendingTaskTimeout"), new Rename("druid.indexer.worker.version", "druid.indexer.autoscale.workerVersion"), new Rename("druid.indexer.worker.port", "druid.indexer.autoscale.workerPort"), - new Rename("druid.worker.masterService", "druid.worker.overlordService"), + new Rename("druid.worker.masterService", "druid.selectors.indexing.serviceName"), new ChatHandlerConverter(), new Rename("druid.indexer.baseDir", "druid.indexer.task.baseDir"), new Rename("druid.indexer.taskDir", "druid.indexer.task.taskDir"), @@ -100,6 +100,7 @@ public class ConvertProperties implements Runnable new Rename("druid.worker.taskActionClient.retry.minWaitMillis", "druid.worker.taskActionClient.retry.minWait"), new Rename("druid.worker.taskActionClient.retry.maxWaitMillis", "druid.worker.taskActionClient.retry.maxWait"), new Rename("druid.master.merger.service", "druid.selectors.indexing.serviceName"), + new Rename("druid.master.merger.on", "druid.master.merge.on"), new DataSegmentPusherDefaultConverter(), new Rename("druid.pusher.hdfs.storageDirectory", "druid.pusher.storageDirectory"), new Rename("druid.pusher.cassandra.host", "druid.pusher.host"), From 0b04325ee89227312f57966cd1cf2d010e671a77 Mon Sep 17 00:00:00 2001 From: fjy Date: Fri, 27 Sep 2013 10:17:45 -0700 Subject: [PATCH 2/2] fix things up according to code review comments --- .../main/java/io/druid/indexing/common/TaskToolboxFactory.java | 2 +- services/src/main/java/io/druid/cli/CliPeon.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java index 33e2a367c41..ca00dccaf91 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java @@ -68,7 +68,7 @@ public class TaskToolboxFactory @Processing ExecutorService queryExecutorService, MonitorScheduler monitorScheduler, SegmentLoaderFactory segmentLoaderFactory, - @Json ObjectMapper objectMapper + ObjectMapper objectMapper ) { this.config = config; diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index f130f190950..b4b9ce4e661 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -152,9 +152,10 @@ public class CliPeon extends GuiceRunnable { try { Injector injector = makeInjector(); - Lifecycle lifecycle = initLifecycle(injector); try { + Lifecycle lifecycle = initLifecycle(injector); + injector.getInstance(ExecutorLifecycle.class).join(); lifecycle.stop(); }