Add RealtimeMetricsMonitor to RealtimeIndexTask

This commit is contained in:
Gian Merlino 2013-07-24 14:39:59 -07:00
parent 867dedfe1b
commit 0eebb0a149
9 changed files with 58 additions and 72 deletions

View File

@ -37,6 +37,7 @@ import com.metamx.druid.loading.SegmentLoadingException;
import com.metamx.druid.loading.SingleSegmentLoader;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import java.io.File;
@ -58,6 +59,7 @@ public class TaskToolbox
private final DataSegmentAnnouncer segmentAnnouncer;
private final ServerView newSegmentServerView;
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
private final MonitorScheduler monitorScheduler;
private final ObjectMapper objectMapper;
public TaskToolbox(
@ -71,6 +73,7 @@ public class TaskToolbox
DataSegmentAnnouncer segmentAnnouncer,
ServerView newSegmentServerView,
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
MonitorScheduler monitorScheduler,
ObjectMapper objectMapper
)
{
@ -84,6 +87,7 @@ public class TaskToolbox
this.segmentAnnouncer = segmentAnnouncer;
this.newSegmentServerView = newSegmentServerView;
this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate;
this.monitorScheduler = monitorScheduler;
this.objectMapper = objectMapper;
}
@ -127,6 +131,11 @@ public class TaskToolbox
return queryRunnerFactoryConglomerate;
}
public MonitorScheduler getMonitorScheduler()
{
return monitorScheduler;
}
public ObjectMapper getObjectMapper()
{
return objectMapper;
@ -156,7 +165,8 @@ public class TaskToolbox
return retVal;
}
public File getTaskWorkDir() {
public File getTaskWorkDir()
{
return new File(new File(config.getBaseTaskDir(), task.getId()), "work");
}
}

View File

@ -29,6 +29,7 @@ import com.metamx.druid.indexing.common.config.TaskConfig;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
/**
@ -45,6 +46,7 @@ public class TaskToolboxFactory
private final DataSegmentAnnouncer segmentAnnouncer;
private final ServerView newSegmentServerView;
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
private final MonitorScheduler monitorScheduler;
private final ObjectMapper objectMapper;
public TaskToolboxFactory(
@ -57,6 +59,7 @@ public class TaskToolboxFactory
DataSegmentAnnouncer segmentAnnouncer,
ServerView newSegmentServerView,
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
MonitorScheduler monitorScheduler,
ObjectMapper objectMapper
)
{
@ -69,6 +72,7 @@ public class TaskToolboxFactory
this.segmentAnnouncer = segmentAnnouncer;
this.newSegmentServerView = newSegmentServerView;
this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate;
this.monitorScheduler = monitorScheduler;
this.objectMapper = objectMapper;
}
@ -85,6 +89,7 @@ public class TaskToolboxFactory
segmentAnnouncer,
newSegmentServerView,
queryRunnerFactoryConglomerate,
monitorScheduler,
objectMapper
);
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.Closeables;
import com.metamx.common.exception.FormattedException;
@ -41,9 +42,12 @@ import com.metamx.druid.input.InputRow;
import com.metamx.druid.query.FinalizeResultsQueryRunner;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.query.QueryToolChest;
import com.metamx.druid.realtime.FireDepartment;
import com.metamx.druid.realtime.FireDepartmentConfig;
import com.metamx.druid.realtime.FireDepartmentMetrics;
import com.metamx.druid.realtime.RealtimeMetricsMonitor;
import com.metamx.druid.realtime.Schema;
import com.metamx.druid.realtime.SegmentPublisher;
import com.metamx.druid.realtime.firehose.Firehose;
@ -91,7 +95,7 @@ public class RealtimeIndexTask extends AbstractTask
private volatile Plumber plumber = null;
@JsonIgnore
private volatile TaskToolbox toolbox = null;
private volatile QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate = null;
@JsonCreator
public RealtimeIndexTask(
@ -142,7 +146,7 @@ public class RealtimeIndexTask extends AbstractTask
public <T> QueryRunner<T> getQueryRunner(Query<T> query)
{
if (plumber != null) {
QueryRunnerFactory<T, Query<T>> factory = toolbox.getQueryRunnerFactoryConglomerate().findFactory(query);
QueryRunnerFactory<T, Query<T>> factory = queryRunnerFactoryConglomerate.findFactory(query);
QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
return new FinalizeResultsQueryRunner<T>(plumber.getQueryRunner(query), toolChest);
@ -166,7 +170,7 @@ public class RealtimeIndexTask extends AbstractTask
boolean normalExit = true;
final FireDepartmentMetrics metrics = new FireDepartmentMetrics();
// Set up firehose
final Period intermediatePersistPeriod = fireDepartmentConfig.getIntermediatePersistPeriod();
final Firehose firehose = firehoseFactory.connect();
@ -265,12 +269,18 @@ public class RealtimeIndexTask extends AbstractTask
realtimePlumberSchool.setServerView(toolbox.getNewSegmentServerView());
realtimePlumberSchool.setServiceEmitter(toolbox.getEmitter());
this.toolbox = toolbox;
this.plumber = realtimePlumberSchool.findPlumber(schema, metrics);
final FireDepartment fireDepartment = new FireDepartment(schema, fireDepartmentConfig, null, null);
final RealtimeMetricsMonitor metricsMonitor = new RealtimeMetricsMonitor(ImmutableList.of(fireDepartment));
this.queryRunnerFactoryConglomerate = toolbox.getQueryRunnerFactoryConglomerate();
this.plumber = realtimePlumberSchool.findPlumber(schema, fireDepartment.getMetrics());
try {
plumber.startJob();
// Set up metrics emission
toolbox.getMonitorScheduler().addMonitor(metricsMonitor);
// Time to read data!
long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
while (firehose.hasMore()) {
final InputRow inputRow;
@ -282,7 +292,7 @@ public class RealtimeIndexTask extends AbstractTask
final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch());
if (sink == null) {
metrics.incrementThrownAway();
fireDepartment.getMetrics().incrementThrownAway();
log.debug("Throwing away event[%s]", inputRow);
if (System.currentTimeMillis() > nextFlush) {
@ -298,7 +308,7 @@ public class RealtimeIndexTask extends AbstractTask
}
int currCount = sink.add(inputRow);
metrics.incrementProcessed();
fireDepartment.getMetrics().incrementProcessed();
if (currCount >= fireDepartmentConfig.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
@ -306,7 +316,7 @@ public class RealtimeIndexTask extends AbstractTask
}
catch (FormattedException e) {
log.warn(e, "unparseable line");
metrics.incrementUnparseable();
fireDepartment.getMetrics().incrementUnparseable();
}
}
}
@ -327,6 +337,7 @@ public class RealtimeIndexTask extends AbstractTask
}
finally {
Closeables.closeQuietly(firehose);
toolbox.getMonitorScheduler().removeMonitor(metricsMonitor);
}
}
}

View File

@ -22,6 +22,7 @@ package com.metamx.druid.indexing.worker.executor;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -107,7 +108,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
private final ExecutorLifecycleFactory executorLifecycleFactory;
private RestS3Service s3Service = null;
private List<Monitor> monitors = null;
private MonitorScheduler monitorScheduler = null;
private HttpClient httpClient = null;
private ServiceEmitter emitter = null;
private TaskConfig taskConfig = null;
@ -140,58 +141,16 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
this.executorLifecycleFactory = executorLifecycleFactory;
}
public ExecutorNode setHttpClient(HttpClient httpClient)
{
this.httpClient = httpClient;
return this;
}
public ExecutorNode setEmitter(ServiceEmitter emitter)
{
this.emitter = emitter;
return this;
}
public ExecutorNode setS3Service(RestS3Service s3Service)
{
this.s3Service = s3Service;
return this;
}
public ExecutorNode setSegmentPusher(DataSegmentPusher segmentPusher)
{
this.segmentPusher = segmentPusher;
return this;
}
public ExecutorNode setTaskToolboxFactory(TaskToolboxFactory taskToolboxFactory)
{
this.taskToolboxFactory = taskToolboxFactory;
return this;
}
public ExecutorNode setCoordinatorServiceProvider(ServiceProvider coordinatorServiceProvider)
{
this.coordinatorServiceProvider = coordinatorServiceProvider;
return this;
}
public ExecutorNode setServiceDiscovery(ServiceDiscovery serviceDiscovery)
{
this.serviceDiscovery = serviceDiscovery;
return this;
}
@Override
public void doInit() throws Exception
{
initializeHttpClient();
initializeEmitter();
initializeS3Service();
initializeMonitors();
initializeMergerConfig();
initializeServiceDiscovery();
initializeDataSegmentPusher();
initializeMonitorScheduler();
initializeTaskToolbox();
initializeTaskRunner();
initializeChatHandlerProvider();
@ -199,13 +158,6 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
initializeJacksonSubtypes();
initializeServer();
final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle);
final ScheduledExecutorService globalScheduledExec = scheduledExecutorFactory.create(1, "Global--%d");
final MonitorScheduler monitorScheduler = new MonitorScheduler(
configFactory.build(MonitorSchedulerConfig.class), globalScheduledExec, emitter, monitors
);
lifecycle.addManagedInstance(monitorScheduler);
executorLifecycle = executorLifecycleFactory.build(taskRunner, getJsonMapper());
lifecycle.addManagedInstance(executorLifecycle);
@ -229,6 +181,19 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
);
}
private void initializeMonitorScheduler()
{
if (monitorScheduler == null)
{
final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle);
final ScheduledExecutorService globalScheduledExec = scheduledExecutorFactory.create(1, "Global--%d");
this.monitorScheduler = new MonitorScheduler(
configFactory.build(MonitorSchedulerConfig.class), globalScheduledExec, emitter, ImmutableList.<Monitor>of()
);
lifecycle.addManagedInstance(monitorScheduler);
}
}
@LifecycleStart
public synchronized void start() throws Exception
{
@ -333,15 +298,6 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
}
}
private void initializeMonitors()
{
if (monitors == null) {
monitors = Lists.newArrayList();
monitors.add(new JvmMonitor());
monitors.add(new SysMonitor());
}
}
private void initializeMergerConfig()
{
if (taskConfig == null) {
@ -384,6 +340,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
getAnnouncer(),
getServerView(),
getConglomerate(),
monitorScheduler,
getJsonMapper()
);
}

View File

@ -349,7 +349,7 @@ public class RemoteTaskRunnerTest
{
return null;
}
}, null, null, null, null, null, null, null, null, jsonMapper
}, null, null, null, null, null, null, null, null, null, jsonMapper
), Executors.newSingleThreadExecutor()
),
Executors.newSingleThreadExecutor()

View File

@ -159,6 +159,7 @@ public class TaskLifecycleTest
null, // segment announcer
null, // new segment server view
null, // query runner factory conglomerate corporation unionized collective
null, // monitor scheduler
new DefaultObjectMapper()
);

View File

@ -168,6 +168,7 @@ public class TaskQueueTest
null,
null,
null,
null,
null
);
@ -228,6 +229,7 @@ public class TaskQueueTest
null,
null,
null,
null,
null
);

View File

@ -80,7 +80,7 @@
<dependency>
<groupId>com.metamx</groupId>
<artifactId>server-metrics</artifactId>
<version>0.0.2</version>
<version>0.0.3</version>
</dependency>
<dependency>

View File

@ -31,7 +31,7 @@ import java.util.Map;
*/
public class RealtimeMetricsMonitor extends AbstractMonitor
{
Map<FireDepartment, FireDepartmentMetrics> previousValues;
private final Map<FireDepartment, FireDepartmentMetrics> previousValues;
private final List<FireDepartment> fireDepartments;
public RealtimeMetricsMonitor(List<FireDepartment> fireDepartments)