diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolbox.java index 5bbfd73abbe..050a23cb5e0 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolbox.java @@ -37,6 +37,7 @@ import com.metamx.druid.loading.SegmentLoadingException; import com.metamx.druid.loading.SingleSegmentLoader; import com.metamx.druid.query.QueryRunnerFactoryConglomerate; import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.metrics.MonitorScheduler; import org.jets3t.service.impl.rest.httpclient.RestS3Service; import java.io.File; @@ -58,6 +59,7 @@ public class TaskToolbox private final DataSegmentAnnouncer segmentAnnouncer; private final ServerView newSegmentServerView; private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; + private final MonitorScheduler monitorScheduler; private final ObjectMapper objectMapper; public TaskToolbox( @@ -71,6 +73,7 @@ public class TaskToolbox DataSegmentAnnouncer segmentAnnouncer, ServerView newSegmentServerView, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, + MonitorScheduler monitorScheduler, ObjectMapper objectMapper ) { @@ -84,6 +87,7 @@ public class TaskToolbox this.segmentAnnouncer = segmentAnnouncer; this.newSegmentServerView = newSegmentServerView; this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate; + this.monitorScheduler = monitorScheduler; this.objectMapper = objectMapper; } @@ -127,6 +131,11 @@ public class TaskToolbox return queryRunnerFactoryConglomerate; } + public MonitorScheduler getMonitorScheduler() + { + return monitorScheduler; + } + public ObjectMapper getObjectMapper() { return objectMapper; @@ -156,7 +165,8 @@ public class TaskToolbox return retVal; } - public File getTaskWorkDir() { + public File getTaskWorkDir() + { return new File(new File(config.getBaseTaskDir(), task.getId()), "work"); } } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolboxFactory.java index 1d8c244ef7d..91f5c99a333 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolboxFactory.java @@ -29,6 +29,7 @@ import com.metamx.druid.indexing.common.config.TaskConfig; import com.metamx.druid.indexing.common.task.Task; import com.metamx.druid.query.QueryRunnerFactoryConglomerate; import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.metrics.MonitorScheduler; import org.jets3t.service.impl.rest.httpclient.RestS3Service; /** @@ -45,6 +46,7 @@ public class TaskToolboxFactory private final DataSegmentAnnouncer segmentAnnouncer; private final ServerView newSegmentServerView; private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; + private final MonitorScheduler monitorScheduler; private final ObjectMapper objectMapper; public TaskToolboxFactory( @@ -57,6 +59,7 @@ public class TaskToolboxFactory DataSegmentAnnouncer segmentAnnouncer, ServerView newSegmentServerView, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, + MonitorScheduler monitorScheduler, ObjectMapper objectMapper ) { @@ -69,6 +72,7 @@ public class TaskToolboxFactory this.segmentAnnouncer = segmentAnnouncer; this.newSegmentServerView = newSegmentServerView; this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate; + this.monitorScheduler = monitorScheduler; this.objectMapper = objectMapper; } @@ -85,6 +89,7 @@ public class TaskToolboxFactory segmentAnnouncer, newSegmentServerView, queryRunnerFactoryConglomerate, + monitorScheduler, objectMapper ); } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/RealtimeIndexTask.java index e46d07744d3..8ee81e12d9a 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/RealtimeIndexTask.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.io.Closeables; import com.metamx.common.exception.FormattedException; @@ -41,9 +42,12 @@ import com.metamx.druid.input.InputRow; import com.metamx.druid.query.FinalizeResultsQueryRunner; import com.metamx.druid.query.QueryRunner; import com.metamx.druid.query.QueryRunnerFactory; +import com.metamx.druid.query.QueryRunnerFactoryConglomerate; import com.metamx.druid.query.QueryToolChest; +import com.metamx.druid.realtime.FireDepartment; import com.metamx.druid.realtime.FireDepartmentConfig; import com.metamx.druid.realtime.FireDepartmentMetrics; +import com.metamx.druid.realtime.RealtimeMetricsMonitor; import com.metamx.druid.realtime.Schema; import com.metamx.druid.realtime.SegmentPublisher; import com.metamx.druid.realtime.firehose.Firehose; @@ -91,7 +95,7 @@ public class RealtimeIndexTask extends AbstractTask private volatile Plumber plumber = null; @JsonIgnore - private volatile TaskToolbox toolbox = null; + private volatile QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate = null; @JsonCreator public RealtimeIndexTask( @@ -142,7 +146,7 @@ public class RealtimeIndexTask extends AbstractTask public QueryRunner getQueryRunner(Query query) { if (plumber != null) { - QueryRunnerFactory> factory = toolbox.getQueryRunnerFactoryConglomerate().findFactory(query); + QueryRunnerFactory> factory = queryRunnerFactoryConglomerate.findFactory(query); QueryToolChest> toolChest = factory.getToolchest(); return new FinalizeResultsQueryRunner(plumber.getQueryRunner(query), toolChest); @@ -166,7 +170,7 @@ public class RealtimeIndexTask extends AbstractTask boolean normalExit = true; - final FireDepartmentMetrics metrics = new FireDepartmentMetrics(); + // Set up firehose final Period intermediatePersistPeriod = fireDepartmentConfig.getIntermediatePersistPeriod(); final Firehose firehose = firehoseFactory.connect(); @@ -265,12 +269,18 @@ public class RealtimeIndexTask extends AbstractTask realtimePlumberSchool.setServerView(toolbox.getNewSegmentServerView()); realtimePlumberSchool.setServiceEmitter(toolbox.getEmitter()); - this.toolbox = toolbox; - this.plumber = realtimePlumberSchool.findPlumber(schema, metrics); + final FireDepartment fireDepartment = new FireDepartment(schema, fireDepartmentConfig, null, null); + final RealtimeMetricsMonitor metricsMonitor = new RealtimeMetricsMonitor(ImmutableList.of(fireDepartment)); + this.queryRunnerFactoryConglomerate = toolbox.getQueryRunnerFactoryConglomerate(); + this.plumber = realtimePlumberSchool.findPlumber(schema, fireDepartment.getMetrics()); try { plumber.startJob(); + // Set up metrics emission + toolbox.getMonitorScheduler().addMonitor(metricsMonitor); + + // Time to read data! long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); while (firehose.hasMore()) { final InputRow inputRow; @@ -282,7 +292,7 @@ public class RealtimeIndexTask extends AbstractTask final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch()); if (sink == null) { - metrics.incrementThrownAway(); + fireDepartment.getMetrics().incrementThrownAway(); log.debug("Throwing away event[%s]", inputRow); if (System.currentTimeMillis() > nextFlush) { @@ -298,7 +308,7 @@ public class RealtimeIndexTask extends AbstractTask } int currCount = sink.add(inputRow); - metrics.incrementProcessed(); + fireDepartment.getMetrics().incrementProcessed(); if (currCount >= fireDepartmentConfig.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) { plumber.persist(firehose.commit()); nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); @@ -306,7 +316,7 @@ public class RealtimeIndexTask extends AbstractTask } catch (FormattedException e) { log.warn(e, "unparseable line"); - metrics.incrementUnparseable(); + fireDepartment.getMetrics().incrementUnparseable(); } } } @@ -327,6 +337,7 @@ public class RealtimeIndexTask extends AbstractTask } finally { Closeables.closeQuietly(firehose); + toolbox.getMonitorScheduler().removeMonitor(metricsMonitor); } } } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java index 06c160106d5..c5692e59d58 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java @@ -22,6 +22,7 @@ package com.metamx.druid.indexing.worker.executor; import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.smile.SmileFactory; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -107,7 +108,7 @@ public class ExecutorNode extends BaseServerNode private final ExecutorLifecycleFactory executorLifecycleFactory; private RestS3Service s3Service = null; - private List monitors = null; + private MonitorScheduler monitorScheduler = null; private HttpClient httpClient = null; private ServiceEmitter emitter = null; private TaskConfig taskConfig = null; @@ -140,58 +141,16 @@ public class ExecutorNode extends BaseServerNode this.executorLifecycleFactory = executorLifecycleFactory; } - public ExecutorNode setHttpClient(HttpClient httpClient) - { - this.httpClient = httpClient; - return this; - } - - public ExecutorNode setEmitter(ServiceEmitter emitter) - { - this.emitter = emitter; - return this; - } - - public ExecutorNode setS3Service(RestS3Service s3Service) - { - this.s3Service = s3Service; - return this; - } - - public ExecutorNode setSegmentPusher(DataSegmentPusher segmentPusher) - { - this.segmentPusher = segmentPusher; - return this; - } - - public ExecutorNode setTaskToolboxFactory(TaskToolboxFactory taskToolboxFactory) - { - this.taskToolboxFactory = taskToolboxFactory; - return this; - } - - public ExecutorNode setCoordinatorServiceProvider(ServiceProvider coordinatorServiceProvider) - { - this.coordinatorServiceProvider = coordinatorServiceProvider; - return this; - } - - public ExecutorNode setServiceDiscovery(ServiceDiscovery serviceDiscovery) - { - this.serviceDiscovery = serviceDiscovery; - return this; - } - @Override public void doInit() throws Exception { initializeHttpClient(); initializeEmitter(); initializeS3Service(); - initializeMonitors(); initializeMergerConfig(); initializeServiceDiscovery(); initializeDataSegmentPusher(); + initializeMonitorScheduler(); initializeTaskToolbox(); initializeTaskRunner(); initializeChatHandlerProvider(); @@ -199,13 +158,6 @@ public class ExecutorNode extends BaseServerNode initializeJacksonSubtypes(); initializeServer(); - final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle); - final ScheduledExecutorService globalScheduledExec = scheduledExecutorFactory.create(1, "Global--%d"); - final MonitorScheduler monitorScheduler = new MonitorScheduler( - configFactory.build(MonitorSchedulerConfig.class), globalScheduledExec, emitter, monitors - ); - lifecycle.addManagedInstance(monitorScheduler); - executorLifecycle = executorLifecycleFactory.build(taskRunner, getJsonMapper()); lifecycle.addManagedInstance(executorLifecycle); @@ -229,6 +181,19 @@ public class ExecutorNode extends BaseServerNode ); } + private void initializeMonitorScheduler() + { + if (monitorScheduler == null) + { + final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle); + final ScheduledExecutorService globalScheduledExec = scheduledExecutorFactory.create(1, "Global--%d"); + this.monitorScheduler = new MonitorScheduler( + configFactory.build(MonitorSchedulerConfig.class), globalScheduledExec, emitter, ImmutableList.of() + ); + lifecycle.addManagedInstance(monitorScheduler); + } + } + @LifecycleStart public synchronized void start() throws Exception { @@ -333,15 +298,6 @@ public class ExecutorNode extends BaseServerNode } } - private void initializeMonitors() - { - if (monitors == null) { - monitors = Lists.newArrayList(); - monitors.add(new JvmMonitor()); - monitors.add(new SysMonitor()); - } - } - private void initializeMergerConfig() { if (taskConfig == null) { @@ -384,6 +340,7 @@ public class ExecutorNode extends BaseServerNode getAnnouncer(), getServerView(), getConglomerate(), + monitorScheduler, getJsonMapper() ); } diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java index 0a11fcb49fb..5e9505e061f 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java @@ -349,7 +349,7 @@ public class RemoteTaskRunnerTest { return null; } - }, null, null, null, null, null, null, null, null, jsonMapper + }, null, null, null, null, null, null, null, null, null, jsonMapper ), Executors.newSingleThreadExecutor() ), Executors.newSingleThreadExecutor() diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskLifecycleTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskLifecycleTest.java index ef3c6412c6f..ecd31c985d7 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskLifecycleTest.java @@ -159,6 +159,7 @@ public class TaskLifecycleTest null, // segment announcer null, // new segment server view null, // query runner factory conglomerate corporation unionized collective + null, // monitor scheduler new DefaultObjectMapper() ); diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskQueueTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskQueueTest.java index 4e023b736dd..533ae3d2760 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskQueueTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskQueueTest.java @@ -168,6 +168,7 @@ public class TaskQueueTest null, null, null, + null, null ); @@ -228,6 +229,7 @@ public class TaskQueueTest null, null, null, + null, null ); diff --git a/pom.xml b/pom.xml index 3dd102990b8..c7ea1da077e 100644 --- a/pom.xml +++ b/pom.xml @@ -80,7 +80,7 @@ com.metamx server-metrics - 0.0.2 + 0.0.3 diff --git a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeMetricsMonitor.java b/realtime/src/main/java/com/metamx/druid/realtime/RealtimeMetricsMonitor.java index 3d04392aab0..69d1e0169fa 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeMetricsMonitor.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/RealtimeMetricsMonitor.java @@ -31,7 +31,7 @@ import java.util.Map; */ public class RealtimeMetricsMonitor extends AbstractMonitor { - Map previousValues; + private final Map previousValues; private final List fireDepartments; public RealtimeMetricsMonitor(List fireDepartments)