Merge pull request #199 from metamx/indexing-service-mods

Improve robustness and usability of RealtimeIndexTask and some code cleanup in the Indexing Service
This commit is contained in:
cheddar 2013-07-25 17:09:15 -07:00
commit a6bc63732d
24 changed files with 305 additions and 525 deletions

View File

@ -37,6 +37,7 @@ import com.metamx.druid.loading.SegmentLoadingException;
import com.metamx.druid.loading.SingleSegmentLoader;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import java.io.File;
@ -58,6 +59,7 @@ public class TaskToolbox
private final DataSegmentAnnouncer segmentAnnouncer;
private final ServerView newSegmentServerView;
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
private final MonitorScheduler monitorScheduler;
private final ObjectMapper objectMapper;
public TaskToolbox(
@ -71,6 +73,7 @@ public class TaskToolbox
DataSegmentAnnouncer segmentAnnouncer,
ServerView newSegmentServerView,
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
MonitorScheduler monitorScheduler,
ObjectMapper objectMapper
)
{
@ -84,6 +87,7 @@ public class TaskToolbox
this.segmentAnnouncer = segmentAnnouncer;
this.newSegmentServerView = newSegmentServerView;
this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate;
this.monitorScheduler = monitorScheduler;
this.objectMapper = objectMapper;
}
@ -127,6 +131,11 @@ public class TaskToolbox
return queryRunnerFactoryConglomerate;
}
public MonitorScheduler getMonitorScheduler()
{
return monitorScheduler;
}
public ObjectMapper getObjectMapper()
{
return objectMapper;
@ -156,7 +165,8 @@ public class TaskToolbox
return retVal;
}
public File getTaskWorkDir() {
public File getTaskWorkDir()
{
return new File(new File(config.getBaseTaskDir(), task.getId()), "work");
}
}

View File

@ -29,6 +29,7 @@ import com.metamx.druid.indexing.common.config.TaskConfig;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
/**
@ -45,6 +46,7 @@ public class TaskToolboxFactory
private final DataSegmentAnnouncer segmentAnnouncer;
private final ServerView newSegmentServerView;
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
private final MonitorScheduler monitorScheduler;
private final ObjectMapper objectMapper;
public TaskToolboxFactory(
@ -57,6 +59,7 @@ public class TaskToolboxFactory
DataSegmentAnnouncer segmentAnnouncer,
ServerView newSegmentServerView,
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
MonitorScheduler monitorScheduler,
ObjectMapper objectMapper
)
{
@ -69,6 +72,7 @@ public class TaskToolboxFactory
this.segmentAnnouncer = segmentAnnouncer;
this.newSegmentServerView = newSegmentServerView;
this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate;
this.monitorScheduler = monitorScheduler;
this.objectMapper = objectMapper;
}
@ -85,6 +89,7 @@ public class TaskToolboxFactory
segmentAnnouncer,
newSegmentServerView,
queryRunnerFactoryConglomerate,
monitorScheduler,
objectMapper
);
}

View File

@ -65,6 +65,8 @@ public class RemoteTaskActionClient implements TaskActionClient
final String response;
log.info("Submitting action for task[%s] to coordinator[%s]: %s", task.getId(), serviceUri, taskAction);
try {
response = httpClient.post(serviceUri.toURL())
.setContent("application/json", dataToSend)
@ -107,7 +109,7 @@ public class RemoteTaskActionClient implements TaskActionClient
final String scheme;
final String host;
final int port;
final String path = "/mmx/merger/v1/action";
final String path = "/druid/indexer/v1/action";
if (instance == null) {
throw new ISE("Cannot find instance of indexer to talk to!");

View File

@ -127,12 +127,6 @@ public abstract class AbstractTask implements Task
return TaskStatus.running(id);
}
@Override
public void shutdown()
{
// Do nothing.
}
@Override
public String toString()
{

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.Closeables;
import com.metamx.common.exception.FormattedException;
@ -41,15 +42,20 @@ import com.metamx.druid.input.InputRow;
import com.metamx.druid.query.FinalizeResultsQueryRunner;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.query.QueryToolChest;
import com.metamx.druid.realtime.FireDepartment;
import com.metamx.druid.realtime.FireDepartmentConfig;
import com.metamx.druid.realtime.FireDepartmentMetrics;
import com.metamx.druid.realtime.RealtimeMetricsMonitor;
import com.metamx.druid.realtime.Schema;
import com.metamx.druid.realtime.SegmentPublisher;
import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.druid.realtime.firehose.GracefulShutdownFirehose;
import com.metamx.druid.realtime.plumber.NoopRejectionPolicyFactory;
import com.metamx.druid.realtime.plumber.Plumber;
import com.metamx.druid.realtime.plumber.RealtimePlumberSchool;
import com.metamx.druid.realtime.plumber.RejectionPolicyFactory;
import com.metamx.druid.realtime.plumber.Sink;
import com.metamx.druid.realtime.plumber.VersioningPolicy;
import com.metamx.emitter.EmittingLogger;
@ -87,20 +93,14 @@ public class RealtimeIndexTask extends AbstractTask
@JsonIgnore
private final IndexGranularity segmentGranularity;
@JsonIgnore
private final RejectionPolicyFactory rejectionPolicyFactory;
@JsonIgnore
private volatile Plumber plumber = null;
@JsonIgnore
private volatile TaskToolbox toolbox = null;
@JsonIgnore
private volatile GracefulShutdownFirehose firehose = null;
@JsonIgnore
private final Object lock = new Object();
@JsonIgnore
private volatile boolean shutdown = false;
private volatile QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate = null;
@JsonCreator
public RealtimeIndexTask(
@ -110,7 +110,8 @@ public class RealtimeIndexTask extends AbstractTask
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig,
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity,
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory
)
{
super(
@ -133,6 +134,7 @@ public class RealtimeIndexTask extends AbstractTask
this.fireDepartmentConfig = fireDepartmentConfig;
this.windowPeriod = windowPeriod;
this.segmentGranularity = segmentGranularity;
this.rejectionPolicyFactory = rejectionPolicyFactory;
}
@Override
@ -151,7 +153,7 @@ public class RealtimeIndexTask extends AbstractTask
public <T> QueryRunner<T> getQueryRunner(Query<T> query)
{
if (plumber != null) {
QueryRunnerFactory<T, Query<T>> factory = toolbox.getQueryRunnerFactoryConglomerate().findFactory(query);
QueryRunnerFactory<T, Query<T>> factory = queryRunnerFactoryConglomerate.findFactory(query);
QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
return new FinalizeResultsQueryRunner<T>(plumber.getQueryRunner(query), toolChest);
@ -175,21 +177,9 @@ public class RealtimeIndexTask extends AbstractTask
boolean normalExit = true;
final FireDepartmentMetrics metrics = new FireDepartmentMetrics();
// Set up firehose
final Period intermediatePersistPeriod = fireDepartmentConfig.getIntermediatePersistPeriod();
synchronized (lock) {
if (shutdown) {
return TaskStatus.success(getId());
}
log.info(
"Wrapping firehose in GracefulShutdownFirehose with segmentGranularity[%s] and windowPeriod[%s]",
segmentGranularity,
windowPeriod
);
firehose = new GracefulShutdownFirehose(firehoseFactory.connect(), segmentGranularity, windowPeriod);
}
final Firehose firehose = firehoseFactory.connect();
// It would be nice to get the PlumberSchool in the constructor. Although that will need jackson injectables for
// stuff like the ServerView, which seems kind of odd? Perhaps revisit this when Guice has been introduced.
@ -286,12 +276,22 @@ public class RealtimeIndexTask extends AbstractTask
realtimePlumberSchool.setServerView(toolbox.getNewSegmentServerView());
realtimePlumberSchool.setServiceEmitter(toolbox.getEmitter());
this.toolbox = toolbox;
this.plumber = realtimePlumberSchool.findPlumber(schema, metrics);
if (this.rejectionPolicyFactory != null) {
realtimePlumberSchool.setRejectionPolicyFactory(rejectionPolicyFactory);
}
final FireDepartment fireDepartment = new FireDepartment(schema, fireDepartmentConfig, null, null);
final RealtimeMetricsMonitor metricsMonitor = new RealtimeMetricsMonitor(ImmutableList.of(fireDepartment));
this.queryRunnerFactoryConglomerate = toolbox.getQueryRunnerFactoryConglomerate();
this.plumber = realtimePlumberSchool.findPlumber(schema, fireDepartment.getMetrics());
try {
plumber.startJob();
// Set up metrics emission
toolbox.getMonitorScheduler().addMonitor(metricsMonitor);
// Time to read data!
long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
while (firehose.hasMore()) {
final InputRow inputRow;
@ -303,7 +303,7 @@ public class RealtimeIndexTask extends AbstractTask
final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch());
if (sink == null) {
metrics.incrementThrownAway();
fireDepartment.getMetrics().incrementThrownAway();
log.debug("Throwing away event[%s]", inputRow);
if (System.currentTimeMillis() > nextFlush) {
@ -319,7 +319,7 @@ public class RealtimeIndexTask extends AbstractTask
}
int currCount = sink.add(inputRow);
metrics.incrementProcessed();
fireDepartment.getMetrics().incrementProcessed();
if (currCount >= fireDepartmentConfig.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
@ -327,7 +327,7 @@ public class RealtimeIndexTask extends AbstractTask
}
catch (FormattedException e) {
log.warn(e, "unparseable line");
metrics.incrementUnparseable();
fireDepartment.getMetrics().incrementUnparseable();
}
}
}
@ -348,6 +348,7 @@ public class RealtimeIndexTask extends AbstractTask
}
finally {
Closeables.closeQuietly(firehose);
toolbox.getMonitorScheduler().removeMonitor(metricsMonitor);
}
}
}
@ -355,22 +356,6 @@ public class RealtimeIndexTask extends AbstractTask
return TaskStatus.success(getId());
}
@Override
public void shutdown()
{
try {
synchronized (lock) {
shutdown = true;
if (firehose != null) {
firehose.shutdown();
}
}
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
@JsonProperty
public Schema getSchema()
{
@ -401,6 +386,12 @@ public class RealtimeIndexTask extends AbstractTask
return segmentGranularity;
}
@JsonProperty("rejectionPolicy")
public RejectionPolicyFactory getRejectionPolicyFactory()
{
return rejectionPolicyFactory;
}
public static class TaskActionSegmentPublisher implements SegmentPublisher
{
final Task task;

View File

@ -134,10 +134,4 @@ public interface Task
* @throws Exception
*/
public TaskStatus run(TaskToolbox toolbox) throws Exception;
/**
* Best-effort task cancellation. May or may not do anything. Calling this multiple times may have
* a stronger effect.
*/
public void shutdown();
}

View File

@ -32,6 +32,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.Closer;
import com.google.common.io.Files;
import com.google.common.io.InputSupplier;
import com.google.common.util.concurrent.ListenableFuture;
@ -48,6 +49,7 @@ import com.metamx.druid.indexing.worker.executor.ExecutorMain;
import com.metamx.emitter.EmittingLogger;
import org.apache.commons.io.FileUtils;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@ -115,137 +117,135 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
final ProcessHolder processHolder;
try {
if (!attemptDir.mkdirs()) {
throw new IOException(String.format("Could not create directories: %s", attemptDir));
}
final File taskFile = new File(attemptDir, "task.json");
final File statusFile = new File(attemptDir, "status.json");
final File logFile = new File(attemptDir, "log");
// time to adjust process holders
synchronized (tasks) {
final TaskInfo taskInfo = tasks.get(task.getId());
if (taskInfo.shutdown) {
throw new IllegalStateException("Task has been shut down!");
final Closer closer = Closer.create();
try {
if (!attemptDir.mkdirs()) {
throw new IOException(String.format("Could not create directories: %s", attemptDir));
}
if (taskInfo == null) {
throw new ISE("WTF?! TaskInfo disappeared for task: %s", task.getId());
}
final File taskFile = new File(attemptDir, "task.json");
final File statusFile = new File(attemptDir, "status.json");
final File logFile = new File(attemptDir, "log");
if (taskInfo.processHolder != null) {
throw new ISE("WTF?! TaskInfo already has a process holder for task: %s", task.getId());
}
// time to adjust process holders
synchronized (tasks) {
final TaskInfo taskInfo = tasks.get(task.getId());
final List<String> command = Lists.newArrayList();
final int childPort = findUnusedPort();
final String childHost = String.format(config.getHostPattern(), childPort);
if (taskInfo.shutdown) {
throw new IllegalStateException("Task has been shut down!");
}
command.add(config.getJavaCommand());
command.add("-cp");
command.add(config.getJavaClasspath());
if (taskInfo == null) {
throw new ISE("WTF?! TaskInfo disappeared for task: %s", task.getId());
}
Iterables.addAll(
command,
Splitter.on(CharMatcher.WHITESPACE)
.omitEmptyStrings()
.split(config.getJavaOptions())
);
if (taskInfo.processHolder != null) {
throw new ISE("WTF?! TaskInfo already has a process holder for task: %s", task.getId());
}
for (String propName : props.stringPropertyNames()) {
for (String allowedPrefix : config.getAllowedPrefixes()) {
if (propName.startsWith(allowedPrefix)) {
final List<String> command = Lists.newArrayList();
final int childPort = findUnusedPort();
final String childHost = String.format(config.getHostPattern(), childPort);
command.add(config.getJavaCommand());
command.add("-cp");
command.add(config.getJavaClasspath());
Iterables.addAll(
command,
Splitter.on(CharMatcher.WHITESPACE)
.omitEmptyStrings()
.split(config.getJavaOptions())
);
for (String propName : props.stringPropertyNames()) {
for (String allowedPrefix : config.getAllowedPrefixes()) {
if (propName.startsWith(allowedPrefix)) {
command.add(
String.format(
"-D%s=%s",
propName,
props.getProperty(propName)
)
);
}
}
}
// Override child JVM specific properties
for (String propName : props.stringPropertyNames()) {
if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
command.add(
String.format(
"-D%s=%s",
propName,
propName.substring(CHILD_PROPERTY_PREFIX.length()),
props.getProperty(propName)
)
);
}
}
}
// Override child JVM specific properties
for (String propName : props.stringPropertyNames()) {
if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
command.add(
String.format(
"-D%s=%s",
propName.substring(CHILD_PROPERTY_PREFIX.length()),
props.getProperty(propName)
)
);
String nodeType = task.getNodeType();
if (nodeType != null) {
command.add(String.format("-Ddruid.executor.nodeType=%s", nodeType));
}
command.add(String.format("-Ddruid.host=%s", childHost));
command.add(String.format("-Ddruid.port=%d", childPort));
command.add(config.getMainClass());
command.add(taskFile.toString());
command.add(statusFile.toString());
jsonMapper.writeValue(taskFile, task);
log.info("Running command: %s", Joiner.on(" ").join(command));
taskInfo.processHolder = new ProcessHolder(
new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(),
logFile,
childPort
);
processHolder = taskInfo.processHolder;
processHolder.registerWithCloser(closer);
}
String nodeType = task.getNodeType();
if (nodeType != null) {
command.add(String.format("-Ddruid.executor.nodeType=%s", nodeType));
}
log.info("Logging task %s output to: %s", task.getId(), logFile);
command.add(String.format("-Ddruid.host=%s", childHost));
command.add(String.format("-Ddruid.port=%d", childPort));
command.add(config.getMainClass());
command.add(taskFile.toString());
command.add(statusFile.toString());
jsonMapper.writeValue(taskFile, task);
log.info("Running command: %s", Joiner.on(" ").join(command));
taskInfo.processHolder = new ProcessHolder(
new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(),
logFile,
childPort
final InputStream fromProc = processHolder.process.getInputStream();
final OutputStream toLogfile = closer.register(
Files.newOutputStreamSupplier(logFile).getOutput()
);
processHolder = taskInfo.processHolder;
}
boolean runFailed = true;
log.info("Logging task %s output to: %s", task.getId(), logFile);
final OutputStream toProc = processHolder.process.getOutputStream();
final InputStream fromProc = processHolder.process.getInputStream();
final OutputStream toLogfile = Files.newOutputStreamSupplier(logFile).getOutput();
boolean runFailed = false;
try {
ByteStreams.copy(fromProc, toLogfile);
final int statusCode = processHolder.process.waitFor();
log.info("Process exited with status[%d] for task: %s", statusCode, task.getId());
if (statusCode != 0) {
runFailed = true;
if (statusCode == 0) {
runFailed = false;
}
}
catch (Exception e) {
log.warn(e, "Failed to read from process for task: %s", task.getId());
runFailed = true;
}
finally {
Closeables.closeQuietly(fromProc);
Closeables.closeQuietly(toLogfile);
Closeables.closeQuietly(toProc);
}
// Upload task logs
// Upload task logs
// XXX: Consider uploading periodically for very long-lived tasks to prevent
// XXX: bottlenecks at the end or the possibility of losing a lot of logs all
// XXX: at once.
// XXX: Consider uploading periodically for very long-lived tasks to prevent
// XXX: bottlenecks at the end or the possibility of losing a lot of logs all
// XXX: at once.
taskLogPusher.pushTaskLog(task.getId(), logFile);
taskLogPusher.pushTaskLog(task.getId(), logFile);
if (!runFailed) {
// Process exited successfully
return jsonMapper.readValue(statusFile, TaskStatus.class);
} else {
// Process exited unsuccessfully
return TaskStatus.failure(task.getId());
if (!runFailed) {
// Process exited successfully
return jsonMapper.readValue(statusFile, TaskStatus.class);
} else {
// Process exited unsuccessfully
return TaskStatus.failure(task.getId());
}
} catch (Throwable t) {
throw closer.rethrow(t);
} finally {
closer.close();
}
}
catch (Exception e) {
@ -311,31 +311,9 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
}
if (taskInfo.processHolder != null) {
final int shutdowns = taskInfo.processHolder.shutdowns.getAndIncrement();
if (shutdowns == 0) {
log.info("Attempting to gracefully shutdown task: %s", taskid);
try {
// This is gross, but it may still be nicer than talking to the forked JVM via HTTP.
final OutputStream out = taskInfo.processHolder.process.getOutputStream();
out.write(
jsonMapper.writeValueAsBytes(
ImmutableMap.of(
"shutdown",
"now"
)
)
);
out.write('\n');
out.flush();
}
catch (IOException e) {
throw Throwables.propagate(e);
}
} else {
// Will trigger normal failure mechanisms due to process exit
log.info("Killing process for task: %s", taskid);
taskInfo.processHolder.process.destroy();
}
// Will trigger normal failure mechanisms due to process exit
log.info("Killing process for task: %s", taskid);
taskInfo.processHolder.process.destroy();
}
}
@ -429,7 +407,6 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
private final Process process;
private final File logFile;
private final int port;
private final AtomicInteger shutdowns = new AtomicInteger(0);
private ProcessHolder(Process process, File logFile, int port)
{
@ -437,5 +414,11 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
this.logFile = logFile;
this.port = port;
}
private void registerWithCloser(Closer closer)
{
closer.register(process.getInputStream());
closer.register(process.getOutputStream());
}
}
}

View File

@ -40,8 +40,7 @@ public interface TaskRunner
public ListenableFuture<TaskStatus> run(Task task);
/**
* Best-effort task cancellation. May or may not do anything. Calling this multiple times may have
* a stronger effect.
* Best-effort task shutdown. May or may not do anything.
*/
public void shutdown(String taskid);

View File

@ -110,7 +110,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
{
for (final TaskRunnerWorkItem runningItem : runningItems) {
if (runningItem.getTask().getId().equals(taskid)) {
runningItem.getTask().shutdown();
runningItem.getResult().cancel(true);
}
}
}

View File

@ -267,6 +267,7 @@ public class IndexerCoordinatorResource
retMap.put("result", ret);
}
catch (IOException e) {
log.warn(e, "Failed to perform task action");
return Response.serverError().build();
}

View File

@ -78,7 +78,6 @@ public class ExecutorLifecycle
}
// Spawn monitor thread to keep a watch on parent's stdin
// If a message comes over stdin, we want to handle it
// If stdin reaches eof, the parent is gone, and we should shut down
parentMonitorExec.submit(
new Runnable()
@ -87,25 +86,8 @@ public class ExecutorLifecycle
public void run()
{
try {
final BufferedReader parentReader = new BufferedReader(new InputStreamReader(parentStream));
String messageString;
while ((messageString = parentReader.readLine()) != null) {
final Map<String, Object> message = jsonMapper
.readValue(
messageString,
new TypeReference<Map<String, Object>>()
{
}
);
if (message == null) {
break;
} else if (message.get("shutdown") != null && message.get("shutdown").equals("now")) {
log.info("Shutting down!");
task.shutdown();
} else {
throw new ISE("Unrecognized message from parent: %s", message);
}
while (parentStream.read() != -1) {
// Toss the byte
}
}
catch (Exception e) {

View File

@ -22,6 +22,7 @@ package com.metamx.druid.indexing.worker.executor;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -107,7 +108,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
private final ExecutorLifecycleFactory executorLifecycleFactory;
private RestS3Service s3Service = null;
private List<Monitor> monitors = null;
private MonitorScheduler monitorScheduler = null;
private HttpClient httpClient = null;
private ServiceEmitter emitter = null;
private TaskConfig taskConfig = null;
@ -140,58 +141,16 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
this.executorLifecycleFactory = executorLifecycleFactory;
}
public ExecutorNode setHttpClient(HttpClient httpClient)
{
this.httpClient = httpClient;
return this;
}
public ExecutorNode setEmitter(ServiceEmitter emitter)
{
this.emitter = emitter;
return this;
}
public ExecutorNode setS3Service(RestS3Service s3Service)
{
this.s3Service = s3Service;
return this;
}
public ExecutorNode setSegmentPusher(DataSegmentPusher segmentPusher)
{
this.segmentPusher = segmentPusher;
return this;
}
public ExecutorNode setTaskToolboxFactory(TaskToolboxFactory taskToolboxFactory)
{
this.taskToolboxFactory = taskToolboxFactory;
return this;
}
public ExecutorNode setCoordinatorServiceProvider(ServiceProvider coordinatorServiceProvider)
{
this.coordinatorServiceProvider = coordinatorServiceProvider;
return this;
}
public ExecutorNode setServiceDiscovery(ServiceDiscovery serviceDiscovery)
{
this.serviceDiscovery = serviceDiscovery;
return this;
}
@Override
public void doInit() throws Exception
{
initializeHttpClient();
initializeEmitter();
initializeS3Service();
initializeMonitors();
initializeMergerConfig();
initializeServiceDiscovery();
initializeDataSegmentPusher();
initializeMonitorScheduler();
initializeTaskToolbox();
initializeTaskRunner();
initializeChatHandlerProvider();
@ -199,13 +158,6 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
initializeJacksonSubtypes();
initializeServer();
final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle);
final ScheduledExecutorService globalScheduledExec = scheduledExecutorFactory.create(1, "Global--%d");
final MonitorScheduler monitorScheduler = new MonitorScheduler(
configFactory.build(MonitorSchedulerConfig.class), globalScheduledExec, emitter, monitors
);
lifecycle.addManagedInstance(monitorScheduler);
executorLifecycle = executorLifecycleFactory.build(taskRunner, getJsonMapper());
lifecycle.addManagedInstance(executorLifecycle);
@ -229,6 +181,19 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
);
}
private void initializeMonitorScheduler()
{
if (monitorScheduler == null)
{
final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle);
final ScheduledExecutorService globalScheduledExec = scheduledExecutorFactory.create(1, "Global--%d");
this.monitorScheduler = new MonitorScheduler(
configFactory.build(MonitorSchedulerConfig.class), globalScheduledExec, emitter, ImmutableList.<Monitor>of()
);
lifecycle.addManagedInstance(monitorScheduler);
}
}
@LifecycleStart
public synchronized void start() throws Exception
{
@ -333,15 +298,6 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
}
}
private void initializeMonitors()
{
if (monitors == null) {
monitors = Lists.newArrayList();
monitors.add(new JvmMonitor());
monitors.add(new SysMonitor());
}
}
private void initializeMergerConfig()
{
if (taskConfig == null) {
@ -384,6 +340,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
getAnnouncer(),
getServerView(),
getConglomerate(),
monitorScheduler,
getJsonMapper()
);
}

View File

@ -207,7 +207,8 @@ public class TaskSerdeTest
null,
null,
new Period("PT10M"),
IndexGranularity.HOUR
IndexGranularity.HOUR,
null
);
final ObjectMapper jsonMapper = new DefaultObjectMapper();

View File

@ -349,7 +349,7 @@ public class RemoteTaskRunnerTest
{
return null;
}
}, null, null, null, null, null, null, null, null, jsonMapper
}, null, null, null, null, null, null, null, null, null, jsonMapper
), Executors.newSingleThreadExecutor()
),
Executors.newSingleThreadExecutor()

View File

@ -159,6 +159,7 @@ public class TaskLifecycleTest
null, // segment announcer
null, // new segment server view
null, // query runner factory conglomerate corporation unionized collective
null, // monitor scheduler
new DefaultObjectMapper()
);

View File

@ -168,6 +168,7 @@ public class TaskQueueTest
null,
null,
null,
null,
null
);
@ -228,6 +229,7 @@ public class TaskQueueTest
null,
null,
null,
null,
null
);

View File

@ -52,6 +52,7 @@ public class TestRealtimeTask extends RealtimeIndexTask
null,
null,
null,
null,
null
);
this.status = status;

View File

@ -80,7 +80,7 @@
<dependency>
<groupId>com.metamx</groupId>
<artifactId>server-metrics</artifactId>
<version>0.0.2</version>
<version>0.0.3</version>
</dependency>
<dependency>

View File

@ -31,7 +31,7 @@ import java.util.Map;
*/
public class RealtimeMetricsMonitor extends AbstractMonitor
{
Map<FireDepartment, FireDepartmentMetrics> previousValues;
private final Map<FireDepartment, FireDepartmentMetrics> previousValues;
private final List<FireDepartment> fireDepartments;
public RealtimeMetricsMonitor(List<FireDepartment> fireDepartments)

View File

@ -1,142 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.firehose;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.logger.Logger;
import com.metamx.druid.index.v1.IndexGranularity;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.realtime.plumber.IntervalRejectionPolicyFactory;
import com.metamx.druid.realtime.plumber.RejectionPolicy;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.Period;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*/
public class GracefulShutdownFirehose implements Firehose
{
private static final Logger log = new Logger(GracefulShutdownFirehose.class);
private final Firehose firehose;
private final IndexGranularity segmentGranularity;
private final long windowMillis;
private final ScheduledExecutorService scheduledExecutor;
private final RejectionPolicy rejectionPolicy;
// when this is set to false, the firehose will have no more rows
private final AtomicBoolean valveOn = new AtomicBoolean(true);
// when this is set to true, the firehose will begin rejecting events
private volatile boolean beginRejectionPolicy = false;
public GracefulShutdownFirehose(
Firehose firehose,
IndexGranularity segmentGranularity,
Period windowPeriod
)
{
this.firehose = firehose;
this.segmentGranularity = segmentGranularity;
this.windowMillis = windowPeriod.toStandardDuration().getMillis() * 2;
this.scheduledExecutor = Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("firehose_scheduled_%d")
.build()
);
final long truncatedNow = segmentGranularity.truncate(new DateTime()).getMillis();
final long end = segmentGranularity.increment(truncatedNow);
this.rejectionPolicy = new IntervalRejectionPolicyFactory(new Interval(truncatedNow, end)).create(windowPeriod);
}
public void shutdown() throws IOException
{
final long truncatedNow = segmentGranularity.truncate(new DateTime()).getMillis();
final long end = segmentGranularity.increment(truncatedNow) + windowMillis;
final Duration timeUntilShutdown = new Duration(System.currentTimeMillis(), end);
log.info("Shutdown at approx. %s (in %s)", new DateTime(end), timeUntilShutdown);
ScheduledExecutors.scheduleWithFixedDelay(
scheduledExecutor,
timeUntilShutdown,
new Callable<ScheduledExecutors.Signal>()
{
@Override
public ScheduledExecutors.Signal call() throws Exception
{
try {
valveOn.set(false);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return ScheduledExecutors.Signal.STOP;
}
}
);
beginRejectionPolicy = true;
}
@Override
public boolean hasMore()
{
return valveOn.get() && firehose.hasMore();
}
@Override
public InputRow nextRow()
{
InputRow next = firehose.nextRow();
if (!beginRejectionPolicy || rejectionPolicy.accept(next.getTimestampFromEpoch())) {
return next;
}
return null;
}
@Override
public Runnable commit()
{
return firehose.commit();
}
@Override
public void close() throws IOException
{
firehose.close();
}
}

View File

@ -1,42 +0,0 @@
package com.metamx.druid.realtime.plumber;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
/**
*/
public class IntervalRejectionPolicyFactory implements RejectionPolicyFactory
{
private final Interval interval;
public IntervalRejectionPolicyFactory(Interval interval)
{
this.interval = interval;
}
@Override
public RejectionPolicy create(Period windowPeriod)
{
return new RejectionPolicy()
{
@Override
public DateTime getCurrMaxTime()
{
return new DateTime();
}
@Override
public boolean accept(long timestamp)
{
return interval.contains(timestamp);
}
@Override
public String toString()
{
return String.format("interval-%s", interval);
}
};
}
}

View File

@ -0,0 +1,26 @@
package com.metamx.druid.realtime.plumber;
import org.joda.time.DateTime;
import org.joda.time.Period;
public class NoopRejectionPolicyFactory implements RejectionPolicyFactory
{
@Override
public RejectionPolicy create(Period windowPeriod)
{
return new RejectionPolicy()
{
@Override
public DateTime getCurrMaxTime()
{
return new DateTime(0);
}
@Override
public boolean accept(long timestamp)
{
return true;
}
};
}
}

View File

@ -333,11 +333,90 @@ public class RealtimePlumberSchool implements PlumberSchool
);
}
// Submits persist-n-merge task for a Sink to the persistExecutor
private void persistAndMerge(final long truncatedTime, final Sink sink)
{
final String threadName = String.format(
"%s-%s-persist-n-merge", schema.getDataSource(), new DateTime(truncatedTime)
);
persistExecutor.execute(
new ThreadRenamingRunnable(threadName)
{
@Override
public void doRun()
{
final Interval interval = sink.getInterval();
for (FireHydrant hydrant : sink) {
if (!hydrant.hasSwapped()) {
log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink);
final int rowCount = persistHydrant(hydrant, schema, interval);
metrics.incrementRowOutputCount(rowCount);
}
}
final File mergedTarget = new File(computePersistDir(schema, interval), "merged");
if (mergedTarget.exists()) {
log.info("Skipping already-merged sink: %s", sink);
return;
}
File mergedFile = null;
try {
List<QueryableIndex> indexes = Lists.newArrayList();
for (FireHydrant fireHydrant : sink) {
Segment segment = fireHydrant.getSegment();
final QueryableIndex queryableIndex = segment.asQueryableIndex();
log.info("Adding hydrant[%s]", fireHydrant);
indexes.add(queryableIndex);
}
mergedFile = IndexMerger.mergeQueryableIndex(
indexes,
schema.getAggregators(),
mergedTarget
);
QueryableIndex index = IndexIO.loadIndex(mergedFile);
DataSegment segment = dataSegmentPusher.push(
mergedFile,
sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions()))
);
segmentPublisher.publishSegment(segment);
}
catch (IOException e) {
log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource())
.addData("interval", interval)
.emit();
}
if (mergedFile != null) {
try {
if (mergedFile != null) {
log.info("Deleting Index File[%s]", mergedFile);
FileUtils.deleteDirectory(mergedFile);
}
}
catch (IOException e) {
log.warn(e, "Error deleting directory[%s]", mergedFile);
}
}
}
}
);
}
@Override
public void finishJob()
{
log.info("Shutting down...");
for (final Map.Entry<Long, Sink> entry : sinks.entrySet()) {
persistAndMerge(entry.getKey(), entry.getValue());
}
while (!sinks.isEmpty()) {
try {
log.info(
@ -584,72 +663,7 @@ public class RealtimePlumberSchool implements PlumberSchool
}
for (final Map.Entry<Long, Sink> entry : sinksToPush) {
final Sink sink = entry.getValue();
final String threadName = String.format(
"%s-%s-persist-n-merge", schema.getDataSource(), new DateTime(entry.getKey())
);
persistExecutor.execute(
new ThreadRenamingRunnable(threadName)
{
@Override
public void doRun()
{
final Interval interval = sink.getInterval();
for (FireHydrant hydrant : sink) {
if (!hydrant.hasSwapped()) {
log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink);
final int rowCount = persistHydrant(hydrant, schema, interval);
metrics.incrementRowOutputCount(rowCount);
}
}
File mergedFile = null;
try {
List<QueryableIndex> indexes = Lists.newArrayList();
for (FireHydrant fireHydrant : sink) {
Segment segment = fireHydrant.getSegment();
final QueryableIndex queryableIndex = segment.asQueryableIndex();
log.info("Adding hydrant[%s]", fireHydrant);
indexes.add(queryableIndex);
}
mergedFile = IndexMerger.mergeQueryableIndex(
indexes,
schema.getAggregators(),
new File(computePersistDir(schema, interval), "merged")
);
QueryableIndex index = IndexIO.loadIndex(mergedFile);
DataSegment segment = dataSegmentPusher.push(
mergedFile,
sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions()))
);
segmentPublisher.publishSegment(segment);
}
catch (IOException e) {
log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource())
.addData("interval", interval)
.emit();
}
if (mergedFile != null) {
try {
if (mergedFile != null) {
log.info("Deleting Index File[%s]", mergedFile);
FileUtils.deleteDirectory(mergedFile);
}
}
catch (IOException e) {
log.warn(e, "Error deleting directory[%s]", mergedFile);
}
}
}
}
);
persistAndMerge(entry.getKey(), entry.getValue());
}
if (stopped) {

View File

@ -7,7 +7,8 @@ import org.joda.time.Period;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "serverTime", value = ServerTimeRejectionPolicyFactory.class),
@JsonSubTypes.Type(name = "messageTime", value = MessageTimeRejectionPolicyFactory.class)
@JsonSubTypes.Type(name = "messageTime", value = MessageTimeRejectionPolicyFactory.class),
@JsonSubTypes.Type(name = "none", value = NoopRejectionPolicyFactory.class)
})
public interface RejectionPolicyFactory
{