diff --git a/examples/src/main/java/druid/examples/guice/RealtimeExampleModule.java b/examples/src/main/java/druid/examples/guice/RealtimeExampleModule.java index a6c08b1e6fd..816a8d367da 100644 --- a/examples/src/main/java/druid/examples/guice/RealtimeExampleModule.java +++ b/examples/src/main/java/druid/examples/guice/RealtimeExampleModule.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.TypeLiteral; +import com.metamx.common.guava.LazySequence; import com.metamx.common.logger.Logger; import druid.examples.flights.FlightsFirehoseFactory; import druid.examples.rand.RandomFirehoseFactory; @@ -34,6 +35,7 @@ import io.druid.client.InventoryView; import io.druid.client.ServerView; import io.druid.guice.FireDepartmentsProvider; import io.druid.guice.JsonConfigProvider; +import io.druid.guice.LazySingleton; import io.druid.guice.ManageLifecycle; import io.druid.guice.NoopSegmentPublisherProvider; import io.druid.guice.RealtimeManagerConfig; @@ -71,7 +73,7 @@ public class RealtimeExampleModule implements DruidModule new TypeLiteral>() { } - ).toProvider(FireDepartmentsProvider.class); + ).toProvider(FireDepartmentsProvider.class).in(LazySingleton.class); binder.bind(RealtimeManager.class).in(ManageLifecycle.class); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java index dcef1147d17..51e0d227d21 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java @@ -39,6 +39,7 @@ import io.druid.timeline.DataSegment; import java.io.File; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; /** * Stuff that may be needed by a Task in order to conduct its business. @@ -55,6 +56,7 @@ public class TaskToolbox private final ServerView newSegmentServerView; private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; private final MonitorScheduler monitorScheduler; + private final ExecutorService queryExecutorService; private final SegmentLoader segmentLoader; private final ObjectMapper objectMapper; @@ -68,6 +70,7 @@ public class TaskToolbox DataSegmentAnnouncer segmentAnnouncer, ServerView newSegmentServerView, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, + ExecutorService queryExecutorService, MonitorScheduler monitorScheduler, SegmentLoader segmentLoader, ObjectMapper objectMapper @@ -82,6 +85,7 @@ public class TaskToolbox this.segmentAnnouncer = segmentAnnouncer; this.newSegmentServerView = newSegmentServerView; this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate; + this.queryExecutorService = queryExecutorService; this.monitorScheduler = monitorScheduler; this.segmentLoader = segmentLoader; this.objectMapper = objectMapper; @@ -127,6 +131,11 @@ public class TaskToolbox return queryRunnerFactoryConglomerate; } + public ExecutorService getQueryExecutorService() + { + return queryExecutorService; + } + public MonitorScheduler getMonitorScheduler() { return monitorScheduler; diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java index fc1e8db390b..0cac5acbce7 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java @@ -24,6 +24,7 @@ import com.google.inject.Inject; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.metrics.MonitorScheduler; import io.druid.client.ServerView; +import io.druid.guice.annotations.Processing; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.Task; @@ -33,6 +34,8 @@ import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.SegmentLoader; import io.druid.server.coordination.DataSegmentAnnouncer; +import java.util.concurrent.ExecutorService; + /** * Stuff that may be needed by a Task in order to conduct its business. */ @@ -46,6 +49,7 @@ public class TaskToolboxFactory private final DataSegmentAnnouncer segmentAnnouncer; private final ServerView newSegmentServerView; private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; + private final ExecutorService queryExecutorService; private final MonitorScheduler monitorScheduler; private final SegmentLoader segmentLoader; private final ObjectMapper objectMapper; @@ -60,6 +64,7 @@ public class TaskToolboxFactory DataSegmentAnnouncer segmentAnnouncer, ServerView newSegmentServerView, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, + @Processing ExecutorService queryExecutorService, MonitorScheduler monitorScheduler, SegmentLoader segmentLoader, ObjectMapper objectMapper @@ -73,6 +78,7 @@ public class TaskToolboxFactory this.segmentAnnouncer = segmentAnnouncer; this.newSegmentServerView = newSegmentServerView; this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate; + this.queryExecutorService = queryExecutorService; this.monitorScheduler = monitorScheduler; this.segmentLoader = segmentLoader; this.objectMapper = objectMapper; @@ -90,6 +96,7 @@ public class TaskToolboxFactory segmentAnnouncer, newSegmentServerView, queryRunnerFactoryConglomerate, + queryExecutorService, monitorScheduler, segmentLoader, objectMapper diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index 63181410f8b..691b7a89a27 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -116,6 +116,7 @@ public class RealtimeIndexTask extends AbstractTask id == null ? makeTaskId(schema.getDataSource(), schema.getShardSpec().getPartitionNum(), new DateTime().toString()) :id, + String.format( "index_realtime_%s", schema.getDataSource() @@ -274,6 +275,7 @@ public class RealtimeIndexTask extends AbstractTask // NOTE: "same" segment. realtimePlumberSchool.setDataSegmentPusher(toolbox.getSegmentPusher()); realtimePlumberSchool.setConglomerate(toolbox.getQueryRunnerFactoryConglomerate()); + realtimePlumberSchool.setQueryExecutorService(toolbox.getQueryExecutorService()); realtimePlumberSchool.setVersioningPolicy(versioningPolicy); realtimePlumberSchool.setSegmentAnnouncer(lockingSegmentAnnouncer); realtimePlumberSchool.setSegmentPublisher(segmentPublisher); diff --git a/indexing-service/src/test/java/io/druid/indexing/coordinator/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/coordinator/TaskLifecycleTest.java index ceb30ffdea6..10f5beb853c 100644 --- a/indexing-service/src/test/java/io/druid/indexing/coordinator/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/coordinator/TaskLifecycleTest.java @@ -137,6 +137,7 @@ public class TaskLifecycleTest null, // segment announcer null, // new segment server view null, // query runner factory conglomerate corporation unionized collective + null, // query executor service null, // monitor scheduler null, // segment loader new DefaultObjectMapper() diff --git a/indexing-service/src/test/java/io/druid/indexing/coordinator/TaskQueueTest.java b/indexing-service/src/test/java/io/druid/indexing/coordinator/TaskQueueTest.java index 7cb5076d21d..af98107a406 100644 --- a/indexing-service/src/test/java/io/druid/indexing/coordinator/TaskQueueTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/coordinator/TaskQueueTest.java @@ -169,6 +169,7 @@ public class TaskQueueTest null, null, null, + null, null ); @@ -230,6 +231,7 @@ public class TaskQueueTest null, null, null, + null, null ); diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java index 399ff4ab05a..39139f29efe 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -115,7 +115,7 @@ public class WorkerTaskMonitorTest new ThreadPoolTaskRunner( new TaskToolboxFactory( new TaskConfig(tmp.toString(), null, null, 0), - null, null, null, null, null, null, null, null, null, jsonMapper + null, null, null, null, null, null, null, null, null, null, jsonMapper ) ), new WorkerConfig().setCapacity(1) diff --git a/realtime/src/main/java/io/druid/guice/RealtimeModule.java b/realtime/src/main/java/io/druid/guice/RealtimeModule.java index 9bdac37a89f..bba4a7d6240 100644 --- a/realtime/src/main/java/io/druid/guice/RealtimeModule.java +++ b/realtime/src/main/java/io/druid/guice/RealtimeModule.java @@ -19,19 +19,24 @@ package io.druid.guice; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.TypeLiteral; import com.metamx.common.logger.Logger; +import io.druid.initialization.DruidModule; import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.RealtimeManager; import io.druid.segment.realtime.SegmentPublisher; +import io.druid.segment.realtime.firehose.KafkaFirehoseFactory; +import java.util.Arrays; import java.util.List; /** */ -public class RealtimeModule implements Module +public class RealtimeModule implements DruidModule { private static final Logger log = new Logger(RealtimeModule.class); @@ -46,7 +51,18 @@ public class RealtimeModule implements Module new TypeLiteral>() { } - ).toProvider(FireDepartmentsProvider.class); + ).toProvider(FireDepartmentsProvider.class).in(LazySingleton.class); binder.bind(RealtimeManager.class).in(ManageLifecycle.class); } + + @Override + public List getJacksonModules() + { + return Arrays.asList( + new SimpleModule("RealtimeModule") + .registerSubtypes( + new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2") + ) + ); + } } diff --git a/realtime/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java b/realtime/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java index ed6ed355ece..64daf729484 100644 --- a/realtime/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java +++ b/realtime/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java @@ -43,6 +43,7 @@ import io.druid.client.DruidServer; import io.druid.client.ServerView; import io.druid.common.guava.ThreadRenamingCallable; import io.druid.common.guava.ThreadRenamingRunnable; +import io.druid.guice.annotations.Processing; import io.druid.query.MetricsEmittingQueryRunner; import io.druid.query.Query; import io.druid.query.QueryRunner; @@ -75,6 +76,7 @@ import org.joda.time.Interval; import org.joda.time.Period; import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; import java.io.File; import java.io.FilenameFilter; import java.io.IOException; @@ -91,24 +93,44 @@ import java.util.concurrent.ScheduledExecutorService; public class RealtimePlumberSchool implements PlumberSchool { private static final EmittingLogger log = new EmittingLogger(RealtimePlumberSchool.class); - private static final ListeningExecutorService EXEC = MoreExecutors.sameThreadExecutor(); private final Period windowPeriod; private final File basePersistDirectory; private final IndexGranularity segmentGranularity; private final Object handoffCondition = new Object(); + @JacksonInject + @NotNull private volatile ServiceEmitter emitter; + + @JacksonInject + @NotNull private volatile QueryRunnerFactoryConglomerate conglomerate = null; + + @JacksonInject + @NotNull private volatile DataSegmentPusher dataSegmentPusher = null; + + @JacksonInject + @NotNull private volatile DataSegmentAnnouncer segmentAnnouncer = null; + + @JacksonInject + @NotNull private volatile SegmentPublisher segmentPublisher = null; + + @JacksonInject + @NotNull private volatile ServerView serverView = null; + @JacksonInject + @NotNull + @Processing + private volatile ExecutorService queryExecutorService = null; + private volatile VersioningPolicy versioningPolicy = null; private volatile RejectionPolicyFactory rejectionPolicyFactory = null; - @JsonCreator public RealtimePlumberSchool( @JsonProperty("windowPeriod") Period windowPeriod, @@ -139,42 +161,41 @@ public class RealtimePlumberSchool implements PlumberSchool this.rejectionPolicyFactory = factory; } - @JacksonInject public void setEmitter(ServiceEmitter emitter) { this.emitter = emitter; } - @JacksonInject public void setConglomerate(QueryRunnerFactoryConglomerate conglomerate) { this.conglomerate = conglomerate; } - @JacksonInject public void setDataSegmentPusher(DataSegmentPusher dataSegmentPusher) { this.dataSegmentPusher = dataSegmentPusher; } - @JacksonInject public void setSegmentAnnouncer(DataSegmentAnnouncer segmentAnnouncer) { this.segmentAnnouncer = segmentAnnouncer; } - @JacksonInject public void setSegmentPublisher(SegmentPublisher segmentPublisher) { this.segmentPublisher = segmentPublisher; } - @JacksonInject public void setServerView(ServerView serverView) { this.serverView = serverView; } + public void setQueryExecutorService(ExecutorService executorService) + { + this.queryExecutorService = queryExecutorService; + } + @Override public Plumber findPlumber(final Schema schema, final FireDepartmentMetrics metrics) { @@ -262,7 +283,7 @@ public class RealtimePlumberSchool implements PlumberSchool return toolchest.mergeResults( factory.mergeRunners( - EXEC, + queryExecutorService, FunctionalIterable .create(querySinks) .transform( @@ -277,7 +298,7 @@ public class RealtimePlumberSchool implements PlumberSchool emitter, builderFn, factory.mergeRunners( - EXEC, + MoreExecutors.sameThreadExecutor(), Iterables.transform( theSink, new Function>() diff --git a/realtime/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/realtime/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index 8b92bff01c2..71eb4659909 100644 --- a/realtime/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/realtime/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -23,6 +23,7 @@ import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Files; +import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.ISE; import com.metamx.emitter.service.ServiceEmitter; import io.druid.client.ServerView; @@ -114,6 +115,7 @@ public class RealtimePlumberSchoolTest realtimePlumberSchool.setDataSegmentPusher(dataSegmentPusher); realtimePlumberSchool.setServerView(serverView); realtimePlumberSchool.setEmitter(emitter); + realtimePlumberSchool.setQueryExecutorService(MoreExecutors.sameThreadExecutor()); plumber = realtimePlumberSchool.findPlumber(schema, new FireDepartmentMetrics()); } diff --git a/services/src/main/java/io/druid/cli/CliRealtimeExample.java b/services/src/main/java/io/druid/cli/CliRealtimeExample.java index adc6f78ccba..3af6b116e6a 100644 --- a/services/src/main/java/io/druid/cli/CliRealtimeExample.java +++ b/services/src/main/java/io/druid/cli/CliRealtimeExample.java @@ -39,7 +39,7 @@ import java.util.List; /** */ @Command( - name = "example realtime", + name = "realtime", description = "Runs a standalone realtime node for examples, see https://github.com/metamx/druid/wiki/Realtime for a description" ) public class CliRealtimeExample extends ServerRunnable