From 24df6c482d15366a3dd9b614992f50961e95f268 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 13 Sep 2013 14:19:42 -0700 Subject: [PATCH] Use shared query executor in indexing-service --- .../druid/indexing/common/TaskToolbox.java | 9 ++++++++ .../indexing/common/TaskToolboxFactory.java | 6 +++++ .../common/task/RealtimeIndexTask.java | 1 + .../worker/executor/ExecutorNode.java | 22 ++++--------------- .../coordinator/RemoteTaskRunnerTest.java | 2 +- .../coordinator/TaskLifecycleTest.java | 1 + .../indexing/coordinator/TaskQueueTest.java | 2 ++ 7 files changed, 24 insertions(+), 19 deletions(-) diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolbox.java index 050a23cb5e0..914297d324c 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolbox.java @@ -43,6 +43,7 @@ import org.jets3t.service.impl.rest.httpclient.RestS3Service; import java.io.File; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; /** * Stuff that may be needed by a Task in order to conduct its business. @@ -59,6 +60,7 @@ public class TaskToolbox private final DataSegmentAnnouncer segmentAnnouncer; private final ServerView newSegmentServerView; private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; + private final ExecutorService queryExecutorService; private final MonitorScheduler monitorScheduler; private final ObjectMapper objectMapper; @@ -73,6 +75,7 @@ public class TaskToolbox DataSegmentAnnouncer segmentAnnouncer, ServerView newSegmentServerView, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, + ExecutorService queryExecutorService, MonitorScheduler monitorScheduler, ObjectMapper objectMapper ) @@ -87,6 +90,7 @@ public class TaskToolbox this.segmentAnnouncer = segmentAnnouncer; this.newSegmentServerView = newSegmentServerView; this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate; + this.queryExecutorService = queryExecutorService; this.monitorScheduler = monitorScheduler; this.objectMapper = objectMapper; } @@ -131,6 +135,11 @@ public class TaskToolbox return queryRunnerFactoryConglomerate; } + public ExecutorService getQueryExecutorService() + { + return queryExecutorService; + } + public MonitorScheduler getMonitorScheduler() { return monitorScheduler; diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolboxFactory.java index 91f5c99a333..ff90dfe9f3b 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolboxFactory.java @@ -32,6 +32,8 @@ import com.metamx.emitter.service.ServiceEmitter; import com.metamx.metrics.MonitorScheduler; import org.jets3t.service.impl.rest.httpclient.RestS3Service; +import java.util.concurrent.ExecutorService; + /** * Stuff that may be needed by a Task in order to conduct its business. */ @@ -46,6 +48,7 @@ public class TaskToolboxFactory private final DataSegmentAnnouncer segmentAnnouncer; private final ServerView newSegmentServerView; private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; + private final ExecutorService queryExecutorService; private final MonitorScheduler monitorScheduler; private final ObjectMapper objectMapper; @@ -59,6 +62,7 @@ public class TaskToolboxFactory DataSegmentAnnouncer segmentAnnouncer, ServerView newSegmentServerView, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, + ExecutorService queryExecutorService, MonitorScheduler monitorScheduler, ObjectMapper objectMapper ) @@ -72,6 +76,7 @@ public class TaskToolboxFactory this.segmentAnnouncer = segmentAnnouncer; this.newSegmentServerView = newSegmentServerView; this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate; + this.queryExecutorService = queryExecutorService; this.monitorScheduler = monitorScheduler; this.objectMapper = objectMapper; } @@ -89,6 +94,7 @@ public class TaskToolboxFactory segmentAnnouncer, newSegmentServerView, queryRunnerFactoryConglomerate, + queryExecutorService, monitorScheduler, objectMapper ); diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/RealtimeIndexTask.java index dc3ad87a9cb..2f6ec6eb563 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/RealtimeIndexTask.java @@ -274,6 +274,7 @@ public class RealtimeIndexTask extends AbstractTask // NOTE: "same" segment. realtimePlumberSchool.setDataSegmentPusher(toolbox.getSegmentPusher()); realtimePlumberSchool.setConglomerate(toolbox.getQueryRunnerFactoryConglomerate()); + realtimePlumberSchool.setQueryExecutorService(toolbox.getQueryExecutorService()); realtimePlumberSchool.setVersioningPolicy(versioningPolicy); realtimePlumberSchool.setSegmentAnnouncer(lockingSegmentAnnouncer); realtimePlumberSchool.setSegmentPublisher(segmentPublisher); diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java index 341a578d6de..4603d744f94 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java @@ -65,7 +65,6 @@ import com.metamx.druid.loading.DataSegmentPusher; import com.metamx.druid.loading.S3DataSegmentKiller; import com.metamx.druid.utils.PropUtils; import com.metamx.emitter.EmittingLogger; -import com.metamx.emitter.core.Emitters; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClientConfig; @@ -108,7 +107,6 @@ public class ExecutorNode extends BaseServerNode private RestS3Service s3Service = null; private MonitorScheduler monitorScheduler = null; private HttpClient httpClient = null; - private ServiceEmitter emitter = null; private TaskConfig taskConfig = null; private WorkerConfig workerConfig = null; private DataSegmentPusher segmentPusher = null; @@ -143,7 +141,6 @@ public class ExecutorNode extends BaseServerNode public void doInit() throws Exception { initializeHttpClient(); - initializeEmitter(); initializeS3Service(); initializeMergerConfig(); initializeServiceDiscovery(); @@ -173,7 +170,7 @@ public class ExecutorNode extends BaseServerNode root.addFilter(GuiceFilter.class, "/druid/worker/v1/*", 0); root.addServlet( new ServletHolder( - new QueryServlet(getJsonMapper(), getSmileMapper(), taskRunner, emitter, getRequestLogger()) + new QueryServlet(getJsonMapper(), getSmileMapper(), taskRunner, getEmitter(), getRequestLogger()) ), "/druid/v2/*" ); @@ -186,7 +183,7 @@ public class ExecutorNode extends BaseServerNode final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle); final ScheduledExecutorService globalScheduledExec = scheduledExecutorFactory.create(1, "Global--%d"); this.monitorScheduler = new MonitorScheduler( - configFactory.build(MonitorSchedulerConfig.class), globalScheduledExec, emitter, ImmutableList.of() + configFactory.build(MonitorSchedulerConfig.class), globalScheduledExec, getEmitter(), ImmutableList.of() ); lifecycle.addManagedInstance(monitorScheduler); } @@ -272,18 +269,6 @@ public class ExecutorNode extends BaseServerNode } } - private void initializeEmitter() - { - if (emitter == null) { - emitter = new ServiceEmitter( - PropUtils.getProperty(props, "druid.service"), - PropUtils.getProperty(props, "druid.host"), - Emitters.create(props, httpClient, getJsonMapper(), lifecycle) - ); - } - EmittingLogger.registerEmitter(emitter); - } - private void initializeS3Service() throws S3ServiceException { if (s3Service == null) { @@ -331,13 +316,14 @@ public class ExecutorNode extends BaseServerNode ), getJsonMapper() ), - emitter, + getEmitter(), s3Service, segmentPusher, dataSegmentKiller, getAnnouncer(), getServerView(), getConglomerate(), + getQueryExecutorService(), monitorScheduler, getJsonMapper() ); diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java index a22664ec77b..8de8b106cea 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java @@ -385,7 +385,7 @@ public class RemoteTaskRunnerTest { return null; } - }, null, null, null, null, null, null, null, null, null, jsonMapper + }, null, null, null, null, null, null, null, null, null, null, jsonMapper ), Executors.newSingleThreadExecutor() ), Executors.newSingleThreadExecutor() diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskLifecycleTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskLifecycleTest.java index 17e718a8983..62010fc8c68 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskLifecycleTest.java @@ -160,6 +160,7 @@ public class TaskLifecycleTest null, // segment announcer null, // new segment server view null, // query runner factory conglomerate corporation unionized collective + null, // query executor service null, // monitor scheduler new DefaultObjectMapper() ); diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskQueueTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskQueueTest.java index f421bee7f3c..e3b230ab233 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskQueueTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskQueueTest.java @@ -169,6 +169,7 @@ public class TaskQueueTest null, null, null, + null, null ); @@ -230,6 +231,7 @@ public class TaskQueueTest null, null, null, + null, null );