Merge pull request #1881 from gianm/restartable-tasks

Restorable indexing tasks
This commit is contained in:
Fangjin Yang 2015-11-23 21:14:37 -08:00
commit 8e83d800d6
26 changed files with 1602 additions and 228 deletions

View File

@ -279,6 +279,8 @@ Additional peon configs include:
|`druid.indexer.task.hadoopWorkingPath`|Temporary working directory for Hadoop tasks.|/tmp/druid-indexing|
|`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|50000|
|`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.3.0|
|`druid.indexer.task.gracefulShutdownTimeout`|Wait this long on middleManager restart for restorable tasks to gracefully exit.|PT5M|
|`druid.indexer.task.directoryLockTimeout`|Wait this long for zombie peons to exit before giving up on their replacements.|PT10M|
If `druid.indexer.runner.separateIngestionEndpoint` is set to true then following configurations are available for the ingestion server at peon:

View File

@ -109,7 +109,7 @@ public class TaskToolboxFactory
public TaskToolbox build(Task task)
{
final File taskWorkDir = new File(new File(config.getBaseTaskDir(), task.getId()), "work");
final File taskWorkDir = config.getTaskWorkDir(task.getId());
return new TaskToolbox(
config,

View File

@ -20,6 +20,7 @@ package io.druid.indexing.common.config;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import org.joda.time.Period;
import java.io.File;
import java.util.List;
@ -30,6 +31,10 @@ public class TaskConfig
"org.apache.hadoop:hadoop-client:2.3.0"
);
private static final Period DEFAULT_DIRECTORY_LOCK_TIMEOUT = new Period("PT10M");
private static final Period DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT = new Period("PT5M");
@JsonProperty
private final String baseDir;
@ -45,13 +50,21 @@ public class TaskConfig
@JsonProperty
private final List<String> defaultHadoopCoordinates;
@JsonProperty
private final Period gracefulShutdownTimeout;
@JsonProperty
private final Period directoryLockTimeout;
@JsonCreator
public TaskConfig(
@JsonProperty("baseDir") String baseDir,
@JsonProperty("baseTaskDir") String baseTaskDir,
@JsonProperty("hadoopWorkingPath") String hadoopWorkingPath,
@JsonProperty("defaultRowFlushBoundary") Integer defaultRowFlushBoundary,
@JsonProperty("defaultHadoopCoordinates") List<String> defaultHadoopCoordinates
@JsonProperty("defaultHadoopCoordinates") List<String> defaultHadoopCoordinates,
@JsonProperty("gracefulShutdownTimeout") Period gracefulShutdownTimeout,
@JsonProperty("directoryLockTimeout") Period directoryLockTimeout
)
{
this.baseDir = baseDir == null ? "/tmp" : baseDir;
@ -61,6 +74,12 @@ public class TaskConfig
this.defaultHadoopCoordinates = defaultHadoopCoordinates == null
? DEFAULT_DEFAULT_HADOOP_COORDINATES
: defaultHadoopCoordinates;
this.gracefulShutdownTimeout = gracefulShutdownTimeout == null
? DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT
: gracefulShutdownTimeout;
this.directoryLockTimeout = directoryLockTimeout == null
? DEFAULT_DIRECTORY_LOCK_TIMEOUT
: directoryLockTimeout;
}
@JsonProperty
@ -75,6 +94,21 @@ public class TaskConfig
return baseTaskDir;
}
public File getTaskDir(String taskId)
{
return new File(baseTaskDir, taskId);
}
public File getTaskWorkDir(String taskId)
{
return new File(getTaskDir(taskId), "work");
}
public File getTaskLockFile(String taskId)
{
return new File(getTaskDir(taskId), "lock");
}
@JsonProperty
public String getHadoopWorkingPath()
{
@ -93,6 +127,18 @@ public class TaskConfig
return defaultHadoopCoordinates;
}
@JsonProperty
public Period getGracefulShutdownTimeout()
{
return gracefulShutdownTimeout;
}
@JsonProperty
public Period getDirectoryLockTimeout()
{
return directoryLockTimeout;
}
private String defaultDir(String configParameter, final String defaultVal)
{
if (configParameter == null) {

View File

@ -128,6 +128,19 @@ public abstract class AbstractTask implements Task
return null;
}
@Override
public boolean canRestore()
{
return false;
}
@Override
public void stopGracefully()
{
// Should not be called when canRestore = false.
throw new UnsupportedOperationException("Cannot stop gracefully");
}
@Override
public String toString()
{

View File

@ -29,6 +29,7 @@ import com.metamx.common.parsers.ParseException;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Committer;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
@ -48,6 +49,9 @@ import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.RealtimeMetricsMonitor;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
import io.druid.segment.realtime.plumber.Committers;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.PlumberSchool;
@ -62,6 +66,7 @@ import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class RealtimeIndexTask extends AbstractTask
{
@ -104,6 +109,12 @@ public class RealtimeIndexTask extends AbstractTask
@JsonIgnore
private volatile Plumber plumber = null;
@JsonIgnore
private volatile Firehose firehose = null;
@JsonIgnore
private volatile boolean stopped = false;
@JsonIgnore
private volatile QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate = null;
@ -285,8 +296,6 @@ public class RealtimeIndexTask extends AbstractTask
this.plumber = plumberSchool.findPlumber(dataSchema, tuningConfig, fireDepartment.getMetrics());
// Delay firehose connection to avoid claiming input resources while the plumber is starting up.
Firehose firehose = null;
Supplier<Committer> committerSupplier = null;
try {
@ -295,12 +304,14 @@ public class RealtimeIndexTask extends AbstractTask
// Set up metrics emission
toolbox.getMonitorScheduler().addMonitor(metricsMonitor);
// Set up firehose
firehose = spec.getIOConfig().getFirehoseFactory().connect(spec.getDataSchema().getParser());
// Delay firehose connection to avoid claiming input resources while the plumber is starting up.
final FirehoseFactory firehoseFactory = spec.getIOConfig().getFirehoseFactory();
final boolean firehoseDrainableByClosing = isFirehoseDrainableByClosing(firehoseFactory);
firehose = firehoseFactory.connect(spec.getDataSchema().getParser());
committerSupplier = Committers.supplierFromFirehose(firehose);
// Time to read data!
while (firehose.hasMore()) {
while ((!stopped || firehoseDrainableByClosing) && firehose.hasMore()) {
final InputRow inputRow;
try {
@ -337,8 +348,38 @@ public class RealtimeIndexTask extends AbstractTask
finally {
if (normalExit) {
try {
plumber.persist(committerSupplier.get());
plumber.finishJob();
if (!stopped) {
// Hand off all pending data
log.info("Persisting and handing off pending data.");
plumber.persist(committerSupplier.get());
plumber.finishJob();
} else {
log.info("Persisting pending data without handoff, in preparation for restart.");
final Committer committer = committerSupplier.get();
final CountDownLatch persistLatch = new CountDownLatch(1);
plumber.persist(
new Committer()
{
@Override
public Object getMetadata()
{
return committer.getMetadata();
}
@Override
public void run()
{
try {
committer.run();
}
finally {
persistLatch.countDown();
}
}
}
);
persistLatch.await();
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to finish realtime task").emit();
@ -352,15 +393,67 @@ public class RealtimeIndexTask extends AbstractTask
}
}
log.info("Job done!");
return TaskStatus.success(getId());
}
@Override
public boolean canRestore()
{
return true;
}
@Override
public void stopGracefully()
{
try {
synchronized (this) {
if (!stopped) {
stopped = true;
log.info("Gracefully stopping.");
if (isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) {
firehose.close();
} else {
log.debug("Cannot drain firehose[%s] by closing, so skipping closing.", firehose);
}
}
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
/**
* Public for tests.
*/
@JsonIgnore
public Firehose getFirehose()
{
return firehose;
}
@JsonProperty("spec")
public FireDepartment getRealtimeIngestionSchema()
{
return spec;
}
/**
* Is a firehose from this factory drainable by closing it? If so, we should drain on stopGracefully rather than
* abruptly stopping.
* <p/>
* This is a hack to get around the fact that the Firehose and FirehoseFactory interfaces do not help us do this.
*/
private static boolean isFirehoseDrainableByClosing(FirehoseFactory firehoseFactory)
{
return firehoseFactory instanceof EventReceiverFirehoseFactory
|| (firehoseFactory instanceof TimedShutoffFirehoseFactory
&& isFirehoseDrainableByClosing(((TimedShutoffFirehoseFactory) firehoseFactory).getDelegateFactory()))
|| (firehoseFactory instanceof ClippedFirehoseFactory
&& isFirehoseDrainableByClosing(((ClippedFirehoseFactory) firehoseFactory).getDelegate()));
}
public static class TaskActionSegmentPublisher implements SegmentPublisher
{
final Task task;

View File

@ -62,6 +62,7 @@ public interface Task
{
/**
* Returns ID of this task. Must be unique across all tasks ever created.
*
* @return task ID
*/
public String getId();
@ -69,6 +70,7 @@ public interface Task
/**
* Returns group ID of this task. Tasks with the same group ID can share locks. If tasks do not need to share locks,
* a common convention is to set group ID equal to task ID.
*
* @return task group ID
*/
public String getGroupId();
@ -76,12 +78,14 @@ public interface Task
/**
* Returns a {@link io.druid.indexing.common.task.TaskResource} for this task. Task resources define specific
* worker requirements a task may require.
*
* @return {@link io.druid.indexing.common.task.TaskResource} for this task
*/
public TaskResource getTaskResource();
/**
* Returns a descriptive label for this task type. Used for metrics emission and logging.
*
* @return task type label
*/
public String getType();
@ -90,7 +94,7 @@ public interface Task
* Get the nodeType for if/when this task publishes on zookeeper.
*
* @return the nodeType to use when publishing the server to zookeeper. null if the task doesn't expect to
* publish to zookeeper.
* publish to zookeeper.
*/
public String getNodeType();
@ -102,7 +106,9 @@ public interface Task
/**
* Returns query runners for this task. If this task is not meant to answer queries over its datasource, this method
* should return null.
*
* @param <T> query result type
*
* @return query runners for this task
*/
public <T> QueryRunner<T> getQueryRunner(Query<T> query);
@ -117,7 +123,7 @@ public interface Task
* Execute preflight actions for a task. This can be used to acquire locks, check preconditions, and so on. The
* actions must be idempotent, since this method may be executed multiple times. This typically runs on the
* coordinator. If this method throws an exception, the task should be considered a failure.
*
* <p/>
* This method must be idempotent, as it may be run multiple times per task.
*
* @param taskActionClient action client for this task (not the full toolbox)
@ -128,6 +134,20 @@ public interface Task
*/
public boolean isReady(TaskActionClient taskActionClient) throws Exception;
/**
* Returns whether or not this task can restore its progress from its on-disk working directory. Restorable tasks
* may be started with a non-empty working directory. Tasks that exit uncleanly may still have a chance to attempt
* restores, meaning that restorable tasks should be able to deal with potentially partially written on-disk state.
*/
public boolean canRestore();
/**
* Asks a task to arrange for its "run" method to exit promptly. This method will only be called if
* {@link #canRestore()} returns true. Tasks that take too long to stop gracefully will be terminated with
* extreme prejudice.
*/
public void stopGracefully();
/**
* Execute a task. This typically runs on a worker as determined by a TaskRunner, and will be run while
* holding a lock on our dataSource and implicit lock interval (if any). If this method throws an exception, the task

View File

@ -17,6 +17,8 @@
package io.druid.indexing.overlord;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.CharMatcher;
import com.google.common.base.Joiner;
@ -28,9 +30,11 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.ByteSink;
import com.google.common.io.ByteSource;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closer;
import com.google.common.io.FileWriteMode;
import com.google.common.io.Files;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@ -40,6 +44,7 @@ import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import io.druid.concurrent.Execs;
import io.druid.guice.annotations.Self;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.config.TaskConfig;
@ -51,6 +56,8 @@ import io.druid.server.DruidNode;
import io.druid.tasklogs.TaskLogPusher;
import io.druid.tasklogs.TaskLogStreamer;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.io.File;
import java.io.IOException;
@ -64,6 +71,7 @@ import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* Runs tasks in separate processes using the "internal peon" verb.
@ -72,6 +80,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
{
private static final EmittingLogger log = new EmittingLogger(ForkingTaskRunner.class);
private static final String CHILD_PROPERTY_PREFIX = "druid.indexer.fork.property.";
private static final String TASK_RESTORE_FILENAME = "restore.json";
private final ForkingTaskRunnerConfig config;
private final TaskConfig taskConfig;
private final Properties props;
@ -83,6 +92,8 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
private final Map<String, ForkingTaskRunnerWorkItem> tasks = Maps.newHashMap();
private volatile boolean stopping = false;
@Inject
public ForkingTaskRunner(
ForkingTaskRunnerConfig config,
@ -102,7 +113,51 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
this.node = node;
this.portFinder = new PortFinder(config.getStartPort());
this.exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(workerConfig.getCapacity()));
this.exec = MoreExecutors.listeningDecorator(
Execs.multiThreaded(workerConfig.getCapacity(), "forking-task-runner-%d")
);
}
@Override
public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
{
final File restoreFile = getRestoreFile();
final TaskRestoreInfo taskRestoreInfo;
if (restoreFile.exists()) {
try {
taskRestoreInfo = jsonMapper.readValue(restoreFile, TaskRestoreInfo.class);
}
catch (Exception e) {
log.error(e, "Failed to read restorable tasks from file[%s]. Skipping restore.", restoreFile);
return ImmutableList.of();
}
} else {
return ImmutableList.of();
}
final List<Pair<Task, ListenableFuture<TaskStatus>>> retVal = Lists.newArrayList();
for (final String taskId : taskRestoreInfo.getRunningTasks()) {
try {
final File taskFile = new File(taskConfig.getTaskDir(taskId), "task.json");
final Task task = jsonMapper.readValue(taskFile, Task.class);
if (!task.getId().equals(taskId)) {
throw new ISE("WTF?! Task[%s] restore file had wrong id[%s].", taskId, task.getId());
}
if (task.canRestore()) {
log.info("Restoring task[%s].", task.getId());
retVal.add(Pair.of(task, run(task)));
}
}
catch (Exception e) {
log.warn(e, "Failed to restore task[%s]. Trying to restore other tasks.", taskId);
}
}
log.info("Restored %,d tasks.", retVal.size());
return retVal;
}
@Override
@ -113,7 +168,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
tasks.put(
task.getId(),
new ForkingTaskRunnerWorkItem(
task.getId(),
task,
exec.submit(
new Callable<TaskStatus>()
{
@ -121,7 +176,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
public TaskStatus call()
{
final String attemptUUID = UUID.randomUUID().toString();
final File taskDir = new File(taskConfig.getBaseTaskDir(), task.getId());
final File taskDir = taskConfig.getTaskDir(task.getId());
final File attemptDir = new File(taskDir, attemptUUID);
final ProcessHolder processHolder;
@ -144,9 +199,9 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
throw new IOException(String.format("Could not create directories: %s", attemptDir));
}
final File taskFile = new File(attemptDir, "task.json");
final File taskFile = new File(taskDir, "task.json");
final File statusFile = new File(attemptDir, "status.json");
final File logFile = new File(attemptDir, "log");
final File logFile = new File(taskDir, "log");
// time to adjust process holders
synchronized (tasks) {
@ -245,12 +300,18 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
command.add(String.format("-Ddruid.host=%s", childHost));
command.add(String.format("-Ddruid.port=%d", childPort));
if(config.isSeparateIngestionEndpoint()) {
command.add(String.format("-Ddruid.indexer.task.chathandler.service=%s", "placeholder/serviceName"));
if (config.isSeparateIngestionEndpoint()) {
command.add(String.format(
"-Ddruid.indexer.task.chathandler.service=%s",
"placeholder/serviceName"
));
// Actual serviceName will be passed by the EventReceiverFirehose when it registers itself with ChatHandlerProvider
// Thus, "placeholder/serviceName" will be ignored
command.add(String.format("-Ddruid.indexer.task.chathandler.host=%s", childHost));
command.add(String.format("-Ddruid.indexer.task.chathandler.port=%d", childChatHandlerPort));
command.add(String.format(
"-Ddruid.indexer.task.chathandler.port=%d",
childChatHandlerPort
));
}
command.add("io.druid.cli.Main");
@ -264,7 +325,9 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
command.add(nodeType);
}
jsonMapper.writeValue(taskFile, task);
if (!taskFile.exists()) {
jsonMapper.writeValue(taskFile, task);
}
log.info("Running command: %s", Joiner.on(" ").join(command));
taskWorkItem.processHolder = new ProcessHolder(
@ -280,7 +343,8 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
log.info("Logging task %s output to: %s", task.getId(), logFile);
boolean runFailed = true;
try (final OutputStream toLogfile = Files.asByteSink(logFile).openStream()) {
final ByteSink logSink = Files.asByteSink(logFile, FileWriteMode.APPEND);
try (final OutputStream toLogfile = logSink.openStream()) {
ByteStreams.copy(processHolder.process.getInputStream(), toLogfile);
final int statusCode = processHolder.process.waitFor();
log.info("Process exited with status[%d] for task: %s", statusCode, task.getId());
@ -319,13 +383,27 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
if (taskWorkItem != null && taskWorkItem.processHolder != null) {
taskWorkItem.processHolder.process.destroy();
}
if (!stopping) {
saveRunningTasks();
}
}
portFinder.markPortUnused(childPort);
if(childChatHandlerPort > 0) {
if (childChatHandlerPort > 0) {
portFinder.markPortUnused(childChatHandlerPort);
}
log.info("Removing temporary directory: %s", attemptDir);
FileUtils.deleteDirectory(attemptDir);
try {
if (!stopping && taskDir.exists()) {
log.info("Removing task directory: %s", taskDir);
FileUtils.deleteDirectory(taskDir);
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to delete task directory")
.addData("taskDir", taskDir.toString())
.addData("task", task.getId())
.emit();
}
}
catch (Exception e) {
log.error(e, "Suppressing exception caught while cleaning up task");
@ -337,7 +415,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
)
);
}
saveRunningTasks();
return tasks.get(task.getId()).getResult();
}
}
@ -345,16 +423,41 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
@LifecycleStop
public void stop()
{
synchronized (tasks) {
exec.shutdown();
stopping = true;
exec.shutdown();
synchronized (tasks) {
for (ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) {
if (taskWorkItem.processHolder != null) {
log.info("Destroying process: %s", taskWorkItem.processHolder.process);
taskWorkItem.processHolder.process.destroy();
log.info("Closing output stream to task[%s].", taskWorkItem.getTask().getId());
try {
taskWorkItem.processHolder.process.getOutputStream().close();
}
catch (Exception e) {
log.warn(e, "Failed to close stdout to task[%s]. Destroying task.", taskWorkItem.getTask().getId());
taskWorkItem.processHolder.process.destroy();
}
}
}
}
final DateTime start = new DateTime();
final long timeout = new Interval(start, taskConfig.getGracefulShutdownTimeout()).toDurationMillis();
// Things should be terminating now. Wait for it to happen so logs can be uploaded and all that good stuff.
log.info("Waiting %,dms for shutdown.", timeout);
if (timeout > 0) {
try {
exec.awaitTermination(timeout, TimeUnit.MILLISECONDS);
log.info("Finished stopping in %,dms.", System.currentTimeMillis() - start.getMillis());
}
catch (InterruptedException e) {
log.warn(e, "Interrupted while waiting for executor to finish.");
Thread.currentThread().interrupt();
}
} else {
log.warn("Ran out of time, not waiting for executor to finish!");
}
}
@Override
@ -448,17 +551,68 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
);
}
// Save running tasks to a file, so they can potentially be restored on next startup. Suppresses exceptions that
// occur while saving.
private void saveRunningTasks()
{
final File restoreFile = getRestoreFile();
final List<String> theTasks = Lists.newArrayList();
for (ForkingTaskRunnerWorkItem forkingTaskRunnerWorkItem : tasks.values()) {
theTasks.add(forkingTaskRunnerWorkItem.getTaskId());
}
try {
Files.createParentDirs(restoreFile);
jsonMapper.writeValue(restoreFile, new TaskRestoreInfo(theTasks));
}
catch (Exception e) {
log.warn(e, "Failed to save tasks to restore file[%s]. Skipping this save.", restoreFile);
}
}
private File getRestoreFile()
{
return new File(taskConfig.getBaseTaskDir(), TASK_RESTORE_FILENAME);
}
private static class TaskRestoreInfo
{
@JsonProperty
private final List<String> runningTasks;
@JsonCreator
public TaskRestoreInfo(
@JsonProperty("runningTasks") List<String> runningTasks
)
{
this.runningTasks = runningTasks;
}
public List<String> getRunningTasks()
{
return runningTasks;
}
}
private static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem
{
private final Task task;
private volatile boolean shutdown = false;
private volatile ProcessHolder processHolder = null;
private ForkingTaskRunnerWorkItem(
String taskId,
Task task,
ListenableFuture<TaskStatus> statusFuture
)
{
super(taskId, statusFuture);
super(task.getId(), statusFuture);
this.task = task;
}
public Task getTask()
{
return task;
}
}

View File

@ -40,6 +40,7 @@ import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.RE;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
@ -305,6 +306,12 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
}
}
@Override
public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
{
return ImmutableList.of();
}
@Override
public Collection<ZkWorker> getWorkers()
{

View File

@ -213,6 +213,9 @@ public class TaskQueue
log.info("Beginning management in %s.", config.getStartDelay());
Thread.sleep(config.getStartDelay().getMillis());
// Ignore return value- we'll get the IDs and futures from getKnownTasks later.
taskRunner.restore();
while (active) {
giant.lock();

View File

@ -18,16 +18,24 @@
package io.druid.indexing.overlord;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.Pair;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
import java.util.Collection;
import java.util.List;
/**
* Interface for handing off tasks. Managed by a {@link io.druid.indexing.overlord.TaskQueue}.
*/
public interface TaskRunner
{
/**
* Some task runners can restart previously-running tasks after being bounced. This method does that, and returns
* the list of tasks (and status futures).
*/
public List<Pair<Task, ListenableFuture<TaskStatus>>> restore();
/**
* Run a task. The returned status should be some kind of completed status.
*
@ -45,6 +53,12 @@ public interface TaskRunner
*/
public void shutdown(String taskid);
/**
* Stop this task runner. This may block until currently-running tasks can be gracefully stopped. After calling
* stopping, "run" will not accept further tasks.
*/
public void stop();
public Collection<? extends TaskRunnerWorkItem> getRunningTasks();
public Collection<? extends TaskRunnerWorkItem> getPendingTasks();

View File

@ -28,53 +28,121 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.metamx.common.Pair;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.AlertEvent;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.concurrent.Execs;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.SegmentDescriptor;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.io.File;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
/**
* Runs tasks in a JVM thread using an ExecutorService.
*/
public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
{
private static final EmittingLogger log = new EmittingLogger(ThreadPoolTaskRunner.class);
private final TaskToolboxFactory toolboxFactory;
private final TaskConfig taskConfig;
private final ListeningExecutorService exec;
private final Set<ThreadPoolTaskRunnerWorkItem> runningItems = new ConcurrentSkipListSet<>();
private final QueryRunnerFactoryConglomerate conglomerate;
private static final EmittingLogger log = new EmittingLogger(ThreadPoolTaskRunner.class);
private final ServiceEmitter emitter;
@Inject
public ThreadPoolTaskRunner(
TaskToolboxFactory toolboxFactory,
QueryRunnerFactoryConglomerate conglomerate
TaskConfig taskConfig,
ServiceEmitter emitter
)
{
this.toolboxFactory = Preconditions.checkNotNull(toolboxFactory, "toolboxFactory");
this.taskConfig = taskConfig;
this.exec = MoreExecutors.listeningDecorator(Execs.singleThreaded("task-runner-%d"));
this.conglomerate = conglomerate;
this.emitter = Preconditions.checkNotNull(emitter, "emitter");
}
@Override
public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
{
return ImmutableList.of();
}
@LifecycleStop
public void stop()
{
exec.shutdown();
for (ThreadPoolTaskRunnerWorkItem item : runningItems) {
final Task task = item.getTask();
final long start = System.currentTimeMillis();
final boolean graceful;
final long elapsed;
boolean error = false;
if (task.canRestore()) {
// Attempt graceful shutdown.
graceful = true;
log.info("Starting graceful shutdown of task[%s].", task.getId());
try {
task.stopGracefully();
final TaskStatus taskStatus = item.getResult().get(
new Interval(new DateTime(start), taskConfig.getGracefulShutdownTimeout()).toDurationMillis(),
TimeUnit.MILLISECONDS
);
log.info(
"Graceful shutdown of task[%s] finished in %,dms with status[%s].",
task.getId(),
System.currentTimeMillis() - start,
taskStatus.getStatusCode()
);
}
catch (Exception e) {
log.makeAlert(e, "Graceful task shutdown failed: %s", task.getDataSource())
.addData("taskId", task.getId())
.addData("dataSource", task.getDataSource())
.emit();
log.warn(e, "Graceful shutdown of task[%s] aborted with exception.");
error = true;
}
} else {
graceful = false;
}
elapsed = System.currentTimeMillis() - start;
final ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent
.builder()
.setDimension("task", task.getId())
.setDimension("dataSource", task.getDataSource())
.setDimension("graceful", String.valueOf(graceful))
.setDimension("error", String.valueOf(error));
emitter.emit(metricBuilder.build("task/interrupt/count", 1L));
emitter.emit(metricBuilder.build("task/interrupt/elapsed", elapsed));
}
// Ok, now interrupt everything.
exec.shutdownNow();
}
@ -210,7 +278,6 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
public TaskStatus call()
{
final long startTime = System.currentTimeMillis();
final File taskDir = toolbox.getTaskWorkDir();
TaskStatus status;
@ -231,19 +298,6 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
throw Throwables.propagate(t);
}
try {
if (taskDir.exists()) {
log.info("Removing task directory: %s", taskDir);
FileUtils.deleteDirectory(taskDir);
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to delete task directory")
.addData("taskDir", taskDir.toString())
.addData("task", task.getId())
.emit();
}
try {
return status.withDuration(System.currentTimeMillis() - startTime);
}

View File

@ -18,7 +18,10 @@
package io.druid.indexing.worker;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.util.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import com.metamx.common.Pair;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
@ -28,18 +31,20 @@ import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.TaskRunner;
import io.druid.indexing.worker.config.WorkerConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
/**
* The monitor watches ZK at a specified path for new tasks to appear. Upon starting the monitor, a listener will be
* created that waits for new tasks. Tasks are executed as soon as they are seen.
*
* <p/>
* The monitor implements {@link io.druid.query.QuerySegmentWalker} so tasks can offer up queryable data. This is useful for
* realtime index tasks.
*/
@ -84,14 +89,20 @@ public class WorkerTaskMonitor
public void start()
{
try {
// restore restorable tasks
final List<Pair<Task, ListenableFuture<TaskStatus>>> restored = taskRunner.restore();
for (Pair<Task, ListenableFuture<TaskStatus>> pair : restored) {
submitTaskRunnable(pair.lhs, pair.rhs);
}
// cleanup any old running task announcements which are invalid after restart
for (TaskAnnouncement announcement : workerCuratorCoordinator.getAnnouncements()){
if(announcement.getTaskStatus().isRunnable()) {
for (TaskAnnouncement announcement : workerCuratorCoordinator.getAnnouncements()) {
if (!isTaskRunning(announcement.getTaskStatus().getId()) && announcement.getTaskStatus().isRunnable()) {
workerCuratorCoordinator.updateAnnouncement(
TaskAnnouncement.create(
announcement.getTaskId(),
announcement.getTaskStatus().getId(),
announcement.getTaskResource(),
TaskStatus.failure(announcement.getTaskId())
TaskStatus.failure(announcement.getTaskStatus().getId())
)
);
}
@ -110,67 +121,7 @@ public class WorkerTaskMonitor
Task.class
);
if (isTaskRunning(task)) {
log.warn(
"I can't build it. There's something in the way. Got task %s that I am already running...",
task.getId()
);
workerCuratorCoordinator.unannounceTask(task.getId());
return;
}
log.info("Submitting runnable for task[%s]", task.getId());
exec.submit(
new Runnable()
{
@Override
public void run()
{
final long startTime = System.currentTimeMillis();
log.info("Affirmative. Running task [%s]", task.getId());
running.add(task);
TaskStatus taskStatus;
try {
workerCuratorCoordinator.unannounceTask(task.getId());
workerCuratorCoordinator.announceTaskAnnouncement(
TaskAnnouncement.create(
task,
TaskStatus.running(task.getId())
)
);
taskStatus = taskRunner.run(task).get();
}
catch (Exception e) {
log.makeAlert(e, "I can't build there. Failed to run task")
.addData("task", task.getId())
.emit();
taskStatus = TaskStatus.failure(task.getId());
}
finally {
running.remove(task);
}
taskStatus = taskStatus.withDuration(System.currentTimeMillis() - startTime);
try {
workerCuratorCoordinator.updateAnnouncement(TaskAnnouncement.create(task, taskStatus));
log.info(
"Job's finished. Completed [%s] with status [%s]",
task.getId(),
taskStatus.getStatusCode()
);
}
catch (Exception e) {
log.makeAlert(e, "Failed to update task status")
.addData("task", task.getId())
.emit();
}
}
}
);
submitTaskRunnable(task, null);
}
}
}
@ -184,10 +135,86 @@ public class WorkerTaskMonitor
}
}
private boolean isTaskRunning(final Task task)
private void submitTaskRunnable(final Task task, final ListenableFuture<TaskStatus> taskStatusAlreadySubmitted)
{
if (isTaskRunning(task.getId())) {
log.warn(
"I can't build it. There's something in the way. Got task %s that I am already running...",
task.getId()
);
workerCuratorCoordinator.unannounceTask(task.getId());
return;
}
log.info("Submitting runnable for task[%s]", task.getId());
running.add(task);
exec.submit(
new Runnable()
{
@Override
public void run()
{
final long startTime = System.currentTimeMillis();
TaskStatus taskStatus;
try {
workerCuratorCoordinator.updateAnnouncement(
TaskAnnouncement.create(
task,
TaskStatus.running(task.getId())
)
);
if (taskStatusAlreadySubmitted != null) {
log.info("Affirmative. Connecting to already-running task [%s]", task.getId());
taskStatus = taskStatusAlreadySubmitted.get();
} else {
log.info("Affirmative. Running task [%s]", task.getId());
workerCuratorCoordinator.unannounceTask(task.getId());
taskStatus = taskRunner.run(task).get();
}
}
catch (InterruptedException e) {
log.debug(e, "Interrupted while running task[%s], exiting.", task.getId());
return;
}
catch (Exception e) {
log.makeAlert(e, "I can't build there. Failed to run task")
.addData("task", task.getId())
.emit();
taskStatus = TaskStatus.failure(task.getId());
}
finally {
running.remove(task);
}
taskStatus = taskStatus.withDuration(System.currentTimeMillis() - startTime);
try {
workerCuratorCoordinator.updateAnnouncement(TaskAnnouncement.create(task, taskStatus));
log.info(
"Job's finished. Completed [%s] with status [%s]",
task.getId(),
taskStatus.getStatusCode()
);
}
catch (Exception e) {
log.makeAlert(e, "Failed to update task status")
.addData("task", task.getId())
.emit();
}
}
}
);
}
private boolean isTaskRunning(final String taskId)
{
for (final Task runningTask : running) {
if (runningTask.getId().equals(task.getId())) {
if (runningTask.getId().equals(taskId)) {
return true;
}
}
@ -200,7 +227,8 @@ public class WorkerTaskMonitor
{
try {
pathChildrenCache.close();
exec.shutdown();
exec.shutdownNow();
taskRunner.stop();
}
catch (Exception e) {
log.makeAlert(e, "Exception stopping WorkerTaskMonitor")

View File

@ -31,12 +31,17 @@ import com.metamx.emitter.EmittingLogger;
import io.druid.concurrent.Execs;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.TaskRunner;
import org.joda.time.DateTime;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.ExecutorService;
/**
@ -47,37 +52,41 @@ public class ExecutorLifecycle
{
private static final EmittingLogger log = new EmittingLogger(ExecutorLifecycle.class);
private final ExecutorLifecycleConfig config;
private final ExecutorLifecycleConfig taskExecutorConfig;
private final TaskConfig taskConfig;
private final TaskActionClientFactory taskActionClientFactory;
private final TaskRunner taskRunner;
private final ObjectMapper jsonMapper;
private final ExecutorService parentMonitorExec = Execs.singleThreaded("parent-monitor-%d");
private volatile Task task = null;
private volatile ListenableFuture<TaskStatus> statusFuture = null;
private volatile FileChannel taskLockChannel;
private volatile FileLock taskLockFileLock;
@Inject
public ExecutorLifecycle(
ExecutorLifecycleConfig config,
ExecutorLifecycleConfig taskExecutorConfig,
TaskConfig taskConfig,
TaskActionClientFactory taskActionClientFactory,
TaskRunner taskRunner,
ObjectMapper jsonMapper
)
{
this.config = config;
this.taskExecutorConfig = taskExecutorConfig;
this.taskConfig = taskConfig;
this.taskActionClientFactory = taskActionClientFactory;
this.taskRunner = taskRunner;
this.jsonMapper = jsonMapper;
}
@LifecycleStart
public void start()
public void start() throws InterruptedException
{
final File taskFile = Preconditions.checkNotNull(config.getTaskFile(), "taskFile");
final File statusFile = Preconditions.checkNotNull(config.getStatusFile(), "statusFile");
final InputStream parentStream = Preconditions.checkNotNull(config.getParentStream(), "parentStream");
final Task task;
final File taskFile = Preconditions.checkNotNull(taskExecutorConfig.getTaskFile(), "taskFile");
final File statusFile = Preconditions.checkNotNull(taskExecutorConfig.getStatusFile(), "statusFile");
final InputStream parentStream = Preconditions.checkNotNull(taskExecutorConfig.getParentStream(), "parentStream");
try {
task = jsonMapper.readValue(taskFile, Task.class);
@ -91,6 +100,43 @@ public class ExecutorLifecycle
throw Throwables.propagate(e);
}
// Avoid running the same task twice on the same machine by locking the task base directory.
final File taskLockFile = taskConfig.getTaskLockFile(task.getId());
try {
synchronized (this) {
if (taskLockChannel == null && taskLockFileLock == null) {
taskLockChannel = FileChannel.open(
taskLockFile.toPath(),
StandardOpenOption.CREATE,
StandardOpenOption.WRITE
);
log.info("Attempting to lock file[%s].", taskLockFile);
final long startLocking = System.currentTimeMillis();
final long timeout = new DateTime(startLocking).plus(taskConfig.getDirectoryLockTimeout()).getMillis();
while (taskLockFileLock == null && System.currentTimeMillis() < timeout) {
taskLockFileLock = taskLockChannel.tryLock();
if (taskLockFileLock == null) {
Thread.sleep(100);
}
}
if (taskLockFileLock == null) {
throw new ISE("Could not acquire lock file[%s] within %,dms.", taskLockFile, timeout - startLocking);
} else {
log.info("Acquired lock file[%s] in %,dms.", taskLockFile, System.currentTimeMillis() - startLocking);
}
} else {
throw new ISE("Already started!");
}
}
}
catch (IOException e) {
throw Throwables.propagate(e);
}
// Spawn monitor thread to keep a watch on parent's stdin
// If stdin reaches eof, the parent is gone, and we should shut down
parentMonitorExec.submit(
@ -120,7 +166,8 @@ public class ExecutorLifecycle
if (!task.isReady(taskActionClientFactory.create(task))) {
throw new ISE("Task is not ready to run yet!", task.getId());
}
} catch (Exception e) {
}
catch (Exception e) {
throw new ISE(e, "Failed to run isReady", task.getId());
}
@ -164,8 +211,18 @@ public class ExecutorLifecycle
}
@LifecycleStop
public void stop()
public void stop() throws Exception
{
parentMonitorExec.shutdown();
synchronized (this) {
if (taskLockFileLock != null) {
taskLockFileLock.release();
}
if (taskLockChannel != null) {
taskLockChannel.close();
}
}
}
}

View File

@ -89,7 +89,7 @@ public class TaskToolboxTest
EasyMock.replay(task);
taskToolbox = new TaskToolboxFactory(
new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null),
new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null, null, null),
mockTaskActionClientFactory,
mockEmitter,
mockSegmentPusher,

View File

@ -19,12 +19,138 @@
package io.druid.indexing.common.task;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.util.Charsets;
import com.google.api.client.util.Sets;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.Granularity;
import com.metamx.common.ISE;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.LoggingEmitter;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.MapCache;
import io.druid.concurrent.Execs;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.granularity.QueryGranularity;
import io.druid.indexing.common.SegmentLoaderFactory;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.actions.LocalTaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.overlord.HeapMemoryTaskStorage;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.TaskLockbox;
import io.druid.indexing.overlord.TaskStorage;
import io.druid.indexing.test.TestDataSegmentAnnouncer;
import io.druid.indexing.test.TestDataSegmentKiller;
import io.druid.indexing.test.TestDataSegmentPusher;
import io.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
import io.druid.indexing.test.TestServerView;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.DefaultQueryRunnerFactoryConglomerate;
import io.druid.query.Druids;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryWatcher;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.timeseries.TimeseriesQueryEngine;
import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import io.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import io.druid.query.timeseries.TimeseriesResultValue;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.StorageLocationConfig;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
import io.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.metrics.EventReceiverFirehoseRegister;
import io.druid.timeline.DataSegment;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
public class RealtimeIndexTaskTest
{
private static final Logger log = new Logger(RealtimeIndexTaskTest.class);
private static final DruidServerMetadata dummyServer = new DruidServerMetadata(
"dummy",
"dummy_host",
0,
"historical",
"dummy_tier",
0
);
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
private static final ServiceEmitter emitter = new ServiceEmitter(
"service",
"host",
new LoggingEmitter(
log,
LoggingEmitter.Level.ERROR,
jsonMapper
)
);
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
private DateTime now;
private ListeningExecutorService taskExec;
@Before
public void setUp()
{
EmittingLogger.registerEmitter(emitter);
emitter.start();
taskExec = MoreExecutors.listeningDecorator(Execs.singleThreaded("realtime-index-task-test-%d"));
now = new DateTime();
}
@After
public void tearDown()
{
taskExec.shutdownNow();
}
@Test
public void testMakeTaskId() throws Exception
@ -34,4 +160,386 @@ public class RealtimeIndexTaskTest
RealtimeIndexTask.makeTaskId("test", 0, new DateTime("2015-01-02"), 0x76543210)
);
}
@Test(timeout = 10000L)
public void testBasics() throws Exception
{
final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator();
final RealtimeIndexTask task = makeRealtimeTask(null);
final TaskToolbox taskToolbox = makeToolbox(task, mdc, tempFolder.newFolder());
final ListenableFuture<TaskStatus> statusFuture = runTask(task, taskToolbox);
// Wait for firehose to show up, it starts off null.
while (task.getFirehose() == null) {
Thread.sleep(50);
}
final EventReceiverFirehoseFactory.EventReceiverFirehose firehose =
(EventReceiverFirehoseFactory.EventReceiverFirehose) task.getFirehose();
firehose.addRows(
ImmutableList.<InputRow>of(
new MapBasedInputRow(
now,
ImmutableList.of("dim1"),
ImmutableMap.<String, Object>of("dim1", "foo")
),
new MapBasedInputRow(
now,
ImmutableList.of("dim2"),
ImmutableMap.<String, Object>of("dim2", "bar")
)
)
);
// Stop the firehose, this will drain out existing events.
firehose.close();
// Wait for publish.
while (mdc.getPublished().isEmpty()) {
Thread.sleep(50);
}
// Do a query.
Assert.assertEquals(2, countEvents(task));
// Simulate handoff.
for (DataSegment segment : mdc.getPublished()) {
((TestServerView) taskToolbox.getNewSegmentServerView()).segmentAdded(dummyServer, segment);
}
// Wait for the task to finish.
final TaskStatus taskStatus = statusFuture.get();
Assert.assertEquals(TaskStatus.Status.SUCCESS, taskStatus.getStatusCode());
}
@Test(timeout = 10000L)
public void testRestore() throws Exception
{
final File directory = tempFolder.newFolder();
final RealtimeIndexTask task1 = makeRealtimeTask(null);
// First run:
{
final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator();
final TaskToolbox taskToolbox = makeToolbox(task1, mdc, directory);
final ListenableFuture<TaskStatus> statusFuture = runTask(task1, taskToolbox);
// Wait for firehose to show up, it starts off null.
while (task1.getFirehose() == null) {
Thread.sleep(50);
}
final EventReceiverFirehoseFactory.EventReceiverFirehose firehose =
(EventReceiverFirehoseFactory.EventReceiverFirehose) task1.getFirehose();
firehose.addRows(
ImmutableList.<InputRow>of(
new MapBasedInputRow(
now,
ImmutableList.of("dim1"),
ImmutableMap.<String, Object>of("dim1", "foo")
)
)
);
// Trigger graceful shutdown.
task1.stopGracefully();
// Wait for the task to finish. The status doesn't really matter, but we'll check it anyway.
final TaskStatus taskStatus = statusFuture.get();
Assert.assertEquals(TaskStatus.Status.SUCCESS, taskStatus.getStatusCode());
// Nothing should be published.
Assert.assertEquals(Sets.newHashSet(), mdc.getPublished());
}
// Second run:
{
final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator();
final RealtimeIndexTask task2 = makeRealtimeTask(task1.getId());
final TaskToolbox taskToolbox = makeToolbox(task2, mdc, directory);
final ListenableFuture<TaskStatus> statusFuture = runTask(task2, taskToolbox);
// Wait for firehose to show up, it starts off null.
while (task2.getFirehose() == null) {
Thread.sleep(50);
}
// Do a query, at this point the previous data should be loaded.
Assert.assertEquals(1, countEvents(task2));
final EventReceiverFirehoseFactory.EventReceiverFirehose firehose =
(EventReceiverFirehoseFactory.EventReceiverFirehose) task2.getFirehose();
firehose.addRows(
ImmutableList.<InputRow>of(
new MapBasedInputRow(
now,
ImmutableList.of("dim2"),
ImmutableMap.<String, Object>of("dim2", "bar")
)
)
);
// Stop the firehose, this will drain out existing events.
firehose.close();
// Wait for publish.
while (mdc.getPublished().isEmpty()) {
Thread.sleep(50);
}
// Do a query.
Assert.assertEquals(2, countEvents(task2));
// Simulate handoff.
for (DataSegment segment : mdc.getPublished()) {
((TestServerView) taskToolbox.getNewSegmentServerView()).segmentAdded(dummyServer, segment);
}
// Wait for the task to finish.
final TaskStatus taskStatus = statusFuture.get();
Assert.assertEquals(TaskStatus.Status.SUCCESS, taskStatus.getStatusCode());
}
}
@Test(timeout = 10000L)
public void testRestoreCorruptData() throws Exception
{
final File directory = tempFolder.newFolder();
final RealtimeIndexTask task1 = makeRealtimeTask(null);
// First run:
{
final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator();
final TaskToolbox taskToolbox = makeToolbox(task1, mdc, directory);
final ListenableFuture<TaskStatus> statusFuture = runTask(task1, taskToolbox);
// Wait for firehose to show up, it starts off null.
while (task1.getFirehose() == null) {
Thread.sleep(50);
}
final EventReceiverFirehoseFactory.EventReceiverFirehose firehose =
(EventReceiverFirehoseFactory.EventReceiverFirehose) task1.getFirehose();
firehose.addRows(
ImmutableList.<InputRow>of(
new MapBasedInputRow(
now,
ImmutableList.of("dim1"),
ImmutableMap.<String, Object>of("dim1", "foo")
)
)
);
// Trigger graceful shutdown.
task1.stopGracefully();
// Wait for the task to finish. The status doesn't really matter, but we'll check it anyway.
final TaskStatus taskStatus = statusFuture.get();
Assert.assertEquals(TaskStatus.Status.SUCCESS, taskStatus.getStatusCode());
// Nothing should be published.
Assert.assertEquals(Sets.newHashSet(), mdc.getPublished());
}
// Corrupt the data:
final File smooshFile = new File(
String.format(
"%s/persistent/task/%s/work/persist/%s/%s_%s/0/00000.smoosh",
directory,
task1.getId(),
task1.getDataSource(),
Granularity.DAY.truncate(now),
Granularity.DAY.increment(Granularity.DAY.truncate(now))
)
);
Files.write(smooshFile.toPath(), "oops!".getBytes(Charsets.UTF_8));
// Second run:
{
final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator();
final RealtimeIndexTask task2 = makeRealtimeTask(task1.getId());
final TaskToolbox taskToolbox = makeToolbox(task2, mdc, directory);
final ListenableFuture<TaskStatus> statusFuture = runTask(task2, taskToolbox);
// Wait for the task to finish.
boolean caught = false;
try {
statusFuture.get();
}
catch (Exception e) {
caught = true;
}
Assert.assertTrue("expected exception", caught);
}
}
private ListenableFuture<TaskStatus> runTask(final Task task, final TaskToolbox toolbox)
{
return taskExec.submit(
new Callable<TaskStatus>()
{
@Override
public TaskStatus call() throws Exception
{
try {
if (task.isReady(toolbox.getTaskActionClient())) {
return task.run(toolbox);
} else {
throw new ISE("Task is not ready");
}
}
catch (Exception e) {
log.warn(e, "Task failed");
throw e;
}
}
}
);
}
private RealtimeIndexTask makeRealtimeTask(final String taskId)
{
ObjectMapper objectMapper = new DefaultObjectMapper();
DataSchema dataSchema = new DataSchema(
"test_ds",
null,
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new UniformGranularitySpec(Granularity.DAY, QueryGranularity.NONE, null),
objectMapper
);
RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig(
new EventReceiverFirehoseFactory(
"foo",
100,
new NoopChatHandlerProvider(),
objectMapper,
null,
new EventReceiverFirehoseRegister()
),
null,
null
);
RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(
1000,
new Period("P1Y"),
new Period("PT10M"),
null,
null,
null,
null,
null,
null
);
return new RealtimeIndexTask(
taskId,
null,
new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig),
null
);
}
private TaskToolbox makeToolbox(final Task task, final IndexerMetadataStorageCoordinator mdc, final File directory)
{
final TaskStorage taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, null, null);
final TaskLockbox taskLockbox = new TaskLockbox(taskStorage);
final TaskActionToolbox taskActionToolbox = new TaskActionToolbox(
taskLockbox,
mdc,
emitter
);
final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory(
taskStorage,
taskActionToolbox
);
final QueryRunnerFactoryConglomerate conglomerate = new DefaultQueryRunnerFactoryConglomerate(
ImmutableMap.<Class<? extends Query>, QueryRunnerFactory>of(
TimeseriesQuery.class,
new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(
new IntervalChunkingQueryRunnerDecorator(null, null, null)
{
@Override
public <T> QueryRunner<T> decorate(
QueryRunner<T> delegate, QueryToolChest<T, ? extends Query<T>> toolChest
)
{
return delegate;
}
}
),
new TimeseriesQueryEngine(),
new QueryWatcher()
{
@Override
public void registerQuery(Query query, ListenableFuture future)
{
// do nothing
}
}
)
)
);
final TestUtils testUtils = new TestUtils();
final TaskToolboxFactory toolboxFactory = new TaskToolboxFactory(
taskConfig,
taskActionClientFactory,
emitter,
new TestDataSegmentPusher(),
new TestDataSegmentKiller(),
null, // DataSegmentMover
null, // DataSegmentArchiver
new TestDataSegmentAnnouncer(),
new TestServerView(),
conglomerate,
MoreExecutors.sameThreadExecutor(), // queryExecutorService
EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(
new SegmentLoaderLocalCacheManager(
null,
new SegmentLoaderConfig()
{
@Override
public List<StorageLocationConfig> getLocations()
{
return Lists.newArrayList();
}
}, testUtils.getTestObjectMapper()
)
),
testUtils.getTestObjectMapper(),
testUtils.getTestIndexMerger(),
testUtils.getTestIndexIO(),
MapCache.create(1024),
new CacheConfig()
);
taskLockbox.add(task);
return toolboxFactory.build(task);
}
public long countEvents(final Task task) throws Exception
{
// Do a query.
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("test_ds")
.aggregators(
ImmutableList.<AggregatorFactory>of(
new LongSumAggregatorFactory("rows", "rows")
)
).granularity(QueryGranularity.ALL)
.intervals("2000/3000")
.build();
ArrayList<Result<TimeseriesResultValue>> results = Sequences.toList(
task.getQueryRunner(query).run(query, ImmutableMap.<String, Object>of()),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
return results.get(0).getValue().getLongMetric("rows");
}
}

View File

@ -192,7 +192,7 @@ public class IngestSegmentFirehoseFactoryTest
);
final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory(
new TaskConfig(tmpDir.getAbsolutePath(), null, null, 50000, null),
new TaskConfig(tmpDir.getAbsolutePath(), null, null, 50000, null, null, null),
tac,
newMockEmitter(),
new DataSegmentPusher()

View File

@ -294,7 +294,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest
}
};
final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory(
new TaskConfig(testCase.tmpDir.getAbsolutePath(), null, null, 50000, null),
new TaskConfig(testCase.tmpDir.getAbsolutePath(), null, null, 50000, null, null, null),
new TaskActionClientFactory()
{
@Override

View File

@ -32,7 +32,6 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.metamx.common.Granularity;
import com.metamx.common.ISE;
import com.metamx.common.guava.Comparators;
@ -72,8 +71,8 @@ import io.druid.indexing.common.task.RealtimeIndexTaskTest;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.TaskResource;
import io.druid.indexing.overlord.config.TaskQueueConfig;
import io.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import io.druid.metadata.SQLMetadataStorageActionHandlerFactory;
import io.druid.metadata.TestDerbyConnector;
import io.druid.query.QueryRunnerFactoryConglomerate;
@ -183,6 +182,7 @@ public class TaskLifecycleTest
private final String taskStorageType;
private ObjectMapper mapper;
private TaskStorageQueryAdapter tsqa = null;
private File tmpDir = null;
@ -190,7 +190,7 @@ public class TaskLifecycleTest
private TaskLockbox tl = null;
private TaskQueue tq = null;
private TaskRunner tr = null;
private MockIndexerMetadataStorageCoordinator mdc = null;
private TestIndexerMetadataStorageCoordinator mdc = null;
private TaskActionClientFactory tac = null;
private TaskToolboxFactory tb = null;
private IndexSpec indexSpec;
@ -205,9 +205,18 @@ public class TaskLifecycleTest
private TestDerbyConnector testDerbyConnector;
private List<ServerView.SegmentCallback> segmentCallbacks = new ArrayList<>();
private static MockIndexerMetadataStorageCoordinator newMockMDC()
private static TestIndexerMetadataStorageCoordinator newMockMDC()
{
return new MockIndexerMetadataStorageCoordinator();
return new TestIndexerMetadataStorageCoordinator()
{
@Override
public Set<DataSegment> announceHistoricalSegments(Set<DataSegment> segments)
{
Set<DataSegment> retVal = super.announceHistoricalSegments(segments);
publishCountDown.countDown();
return retVal;
}
};
}
private static ServiceEmitter newMockEmitter()
@ -373,7 +382,11 @@ public class TaskLifecycleTest
ts = new MetadataTaskStorage(
testDerbyConnector,
new TaskStorageConfig(null),
new SQLMetadataStorageActionHandlerFactory(testDerbyConnector, derbyConnectorRule.metadataTablesConfigSupplier().get(), mapper)
new SQLMetadataStorageActionHandlerFactory(
testDerbyConnector,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
mapper
)
);
} else {
throw new RuntimeException(String.format("Unknown task storage type [%s]", taskStorageType));
@ -408,13 +421,15 @@ public class TaskLifecycleTest
);
}
private void setUpAndStartTaskQueue(DataSegmentPusher dataSegmentPusher) {
private void setUpAndStartTaskQueue(DataSegmentPusher dataSegmentPusher)
{
final TaskConfig taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, null, null);
tsqa = new TaskStorageQueryAdapter(ts);
tl = new TaskLockbox(ts);
mdc = newMockMDC();
tac = new LocalTaskActionClientFactory(ts, new TaskActionToolbox(tl, mdc, newMockEmitter()));
tb = new TaskToolboxFactory(
new TaskConfig(tmpDir.toString(), null, null, 50000, null),
taskConfig,
tac,
newMockEmitter(),
dataSegmentPusher,
@ -491,7 +506,7 @@ public class TaskLifecycleTest
MapCache.create(0),
FireDepartmentTest.NO_CACHE_CONFIG
);
tr = new ThreadPoolTaskRunner(tb, null);
tr = new ThreadPoolTaskRunner(tb, taskConfig, emitter);
tq = new TaskQueue(tqc, ts, tr, tac, tl, emitter);
tq.start();
}
@ -821,7 +836,7 @@ public class TaskLifecycleTest
Assert.assertEquals("segments nuked", 0, mdc.getNuked().size());
}
@Test (timeout = 4000L)
@Test(timeout = 4000L)
public void testRealtimeIndexTask() throws Exception
{
monitorScheduler.addMonitor(EasyMock.anyObject(Monitor.class));
@ -870,7 +885,7 @@ public class TaskLifecycleTest
EasyMock.verify(monitorScheduler, queryRunnerFactoryConglomerate);
}
@Test (timeout = 4000L)
@Test(timeout = 4000L)
public void testRealtimeIndexTaskFailure() throws Exception
{
setUpAndStartTaskQueue(
@ -1012,7 +1027,8 @@ public class TaskLifecycleTest
return retVal;
}
private RealtimeIndexTask giveMeARealtimeIndexTask() {
private RealtimeIndexTask giveMeARealtimeIndexTask()
{
String taskId = String.format("rt_task_%s", System.currentTimeMillis());
DataSchema dataSchema = new DataSchema(
"test_ds",
@ -1023,7 +1039,8 @@ public class TaskLifecycleTest
);
RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig(
new MockFirehoseFactory(true),
null, // PlumberSchool - Realtime Index Task always uses RealtimePlumber which is hardcoded in RealtimeIndexTask class
null,
// PlumberSchool - Realtime Index Task always uses RealtimePlumber which is hardcoded in RealtimeIndexTask class
null
);
RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(
@ -1045,64 +1062,4 @@ public class TaskLifecycleTest
null
);
}
private static class MockIndexerMetadataStorageCoordinator extends IndexerSQLMetadataStorageCoordinator
{
final private Set<DataSegment> published = Sets.newHashSet();
final private Set<DataSegment> nuked = Sets.newHashSet();
private List<DataSegment> unusedSegments;
private MockIndexerMetadataStorageCoordinator()
{
super(null, null, null);
unusedSegments = Lists.newArrayList();
}
@Override
public List<DataSegment> getUsedSegmentsForInterval(String dataSource, Interval interval) throws IOException
{
return ImmutableList.of();
}
@Override
public List<DataSegment> getUnusedSegmentsForInterval(String dataSource, Interval interval)
{
return unusedSegments;
}
@Override
public Set<DataSegment> announceHistoricalSegments(Set<DataSegment> segments)
{
Set<DataSegment> added = Sets.newHashSet();
for (final DataSegment segment : segments) {
if (published.add(segment)) {
added.add(segment);
}
}
TaskLifecycleTest.publishCountDown.countDown();
return ImmutableSet.copyOf(added);
}
@Override
public void deleteSegments(Set<DataSegment> segments)
{
nuked.addAll(segments);
}
public Set<DataSegment> getPublished()
{
return ImmutableSet.copyOf(published);
}
public Set<DataSegment> getNuked()
{
return ImmutableSet.copyOf(nuked);
}
public void setUnusedSegments(List<DataSegment> unusedSegments)
{
this.unusedSegments = unusedSegments;
}
}
}

View File

@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.emitter.EmittingLogger;
@ -294,6 +295,18 @@ public class OverlordResourceTest
this.runningTasks = new ArrayList<>();
}
@Override
public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
{
return ImmutableList.of();
}
@Override
public void stop()
{
// Do nothing
}
@Override
public synchronized ListenableFuture<TaskStatus> run(final Task task)
{

View File

@ -0,0 +1,66 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.test;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import java.io.IOException;
import java.util.Set;
public class TestDataSegmentAnnouncer implements DataSegmentAnnouncer
{
public Set<DataSegment> announcedSegments = Sets.newConcurrentHashSet();
@Override
public void announceSegment(DataSegment segment) throws IOException
{
announcedSegments.add(segment);
}
@Override
public void unannounceSegment(DataSegment segment) throws IOException
{
announcedSegments.remove(segment);
}
@Override
public void announceSegments(Iterable<DataSegment> segments) throws IOException
{
for (DataSegment segment : segments) {
announcedSegments.add(segment);
}
}
@Override
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException
{
for (DataSegment segment : segments) {
announcedSegments.remove(segment);
}
}
public Set<DataSegment> getAnnouncedSegments()
{
return ImmutableSet.copyOf(announcedSegments);
}
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.test;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import java.util.Set;
public class TestDataSegmentKiller implements DataSegmentKiller
{
private final Set<DataSegment> killedSegments = Sets.newConcurrentHashSet();
@Override
public void kill(DataSegment segment) throws SegmentLoadingException
{
killedSegments.add(segment);
}
public Set<DataSegment> getKilledSegments()
{
return ImmutableSet.copyOf(killedSegments);
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.test;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.timeline.DataSegment;
import java.io.File;
import java.io.IOException;
import java.util.Set;
public class TestDataSegmentPusher implements DataSegmentPusher
{
private final Set<DataSegment> pushedSegments = Sets.newConcurrentHashSet();
@Override
public String getPathForHadoop(String dataSource)
{
throw new UnsupportedOperationException();
}
@Override
public DataSegment push(File file, DataSegment segment) throws IOException
{
pushedSegments.add(segment);
return segment;
}
public Set<DataSegment> getPushedSegments()
{
return ImmutableSet.copyOf(pushedSegments);
}
}

View File

@ -0,0 +1,113 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.test;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
import java.io.IOException;
import java.util.List;
import java.util.Set;
public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataStorageCoordinator
{
final private Set<DataSegment> published = Sets.newConcurrentHashSet();
final private Set<DataSegment> nuked = Sets.newConcurrentHashSet();
final private List<DataSegment> unusedSegments;
public TestIndexerMetadataStorageCoordinator()
{
unusedSegments = Lists.newArrayList();
}
@Override
public List<DataSegment> getUsedSegmentsForInterval(String dataSource, Interval interval) throws IOException
{
return ImmutableList.of();
}
@Override
public List<DataSegment> getUnusedSegmentsForInterval(String dataSource, Interval interval)
{
synchronized (unusedSegments) {
return ImmutableList.copyOf(unusedSegments);
}
}
@Override
public Set<DataSegment> announceHistoricalSegments(Set<DataSegment> segments)
{
Set<DataSegment> added = Sets.newHashSet();
for (final DataSegment segment : segments) {
if (published.add(segment)) {
added.add(segment);
}
}
return ImmutableSet.copyOf(added);
}
@Override
public SegmentIdentifier allocatePendingSegment(
String dataSource,
String sequenceName,
String previousSegmentId,
Interval interval,
String maxVersion
) throws IOException
{
throw new UnsupportedOperationException();
}
@Override
public void deleteSegments(Set<DataSegment> segments)
{
nuked.addAll(segments);
}
@Override
public void updateSegmentMetadata(Set<DataSegment> segments) throws IOException
{
throw new UnsupportedOperationException();
}
public Set<DataSegment> getPublished()
{
return ImmutableSet.copyOf(published);
}
public Set<DataSegment> getNuked()
{
return ImmutableSet.copyOf(nuked);
}
public void setUnusedSegments(List<DataSegment> newUnusedSegments)
{
synchronized (unusedSegments) {
unusedSegments.clear();
unusedSegments.addAll(newUnusedSegments);
}
}
}

View File

@ -0,0 +1,116 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.test;
import com.google.api.client.util.Lists;
import com.google.common.base.Predicate;
import com.google.common.collect.Maps;
import com.metamx.common.Pair;
import io.druid.client.FilteredServerView;
import io.druid.client.ServerView;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
public class TestServerView implements FilteredServerView, ServerView.SegmentCallback
{
final ConcurrentMap<ServerView.SegmentCallback, Pair<Predicate<DataSegment>, Executor>> callbacks = Maps.newConcurrentMap();
@Override
public void registerSegmentCallback(
final Executor exec,
final ServerView.SegmentCallback callback,
final Predicate<DataSegment> filter
)
{
callbacks.put(callback, Pair.of(filter, exec));
}
@Override
public ServerView.CallbackAction segmentAdded(
final DruidServerMetadata server,
final DataSegment segment
)
{
for (final Map.Entry<ServerView.SegmentCallback, Pair<Predicate<DataSegment>, Executor>> entry : callbacks.entrySet()) {
if (entry.getValue().lhs.apply(segment)) {
entry.getValue().rhs.execute(
new Runnable()
{
@Override
public void run()
{
entry.getKey().segmentAdded(server, segment);
}
}
);
}
}
return ServerView.CallbackAction.CONTINUE;
}
@Override
public ServerView.CallbackAction segmentRemoved(
final DruidServerMetadata server,
final DataSegment segment
)
{
for (final Map.Entry<ServerView.SegmentCallback, Pair<Predicate<DataSegment>, Executor>> entry : callbacks.entrySet()) {
if (entry.getValue().lhs.apply(segment)) {
entry.getValue().rhs.execute(
new Runnable()
{
@Override
public void run()
{
entry.getKey().segmentRemoved(server, segment);
}
}
);
}
}
return ServerView.CallbackAction.CONTINUE;
}
@Override
public ServerView.CallbackAction segmentViewInitialized()
{
for (final Map.Entry<ServerView.SegmentCallback, Pair<Predicate<DataSegment>, Executor>> entry : callbacks.entrySet()) {
entry.getValue().rhs.execute(
new Runnable()
{
@Override
public void run()
{
entry.getKey().segmentViewInitialized();
}
}
);
}
return ServerView.CallbackAction.CONTINUE;
}
}

View File

@ -41,6 +41,7 @@ import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.StorageLocationConfig;
import io.druid.server.initialization.IndexerZkConfig;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.server.metrics.NoopServiceEmitter;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
@ -133,13 +134,14 @@ public class WorkerTaskMonitorTest
private WorkerTaskMonitor createTaskMonitor()
{
final TaskConfig taskConfig = new TaskConfig(Files.createTempDir().toString(), null, null, 0, null, null, null);
return new WorkerTaskMonitor(
jsonMapper,
cf,
workerCuratorCoordinator,
new ThreadPoolTaskRunner(
new TaskToolboxFactory(
new TaskConfig(Files.createTempDir().toString(), null, null, 0, null),
taskConfig,
null, null, null, null, null, null, null, null, null, null, null, new SegmentLoaderFactory(
new SegmentLoaderLocalCacheManager(
null,
@ -160,7 +162,8 @@ public class WorkerTaskMonitorTest
null,
null
),
null
taskConfig,
new NoopServiceEmitter()
),
new WorkerConfig().setCapacity(1)
);

View File

@ -177,17 +177,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
}
try {
for (final InputRow row : rows) {
boolean added = false;
while (!closed && !added) {
added = buffer.offer(row, 500, TimeUnit.MILLISECONDS);
}
if (!added) {
throw new IllegalStateException("Cannot add events to closed firehose!");
}
}
addRows(rows);
return Response.ok(
objectMapper.writeValueAsString(ImmutableMap.of("eventCount", events.size())),
contentType
@ -207,8 +197,11 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
{
synchronized (readLock) {
try {
while (!closed && nextRow == null) {
while (nextRow == null) {
nextRow = buffer.poll(500, TimeUnit.MILLISECONDS);
if (closed) {
break;
}
}
}
catch (InterruptedException e) {
@ -264,11 +257,29 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
@Override
public void close() throws IOException
{
log.info("Firehose closing.");
closed = true;
eventReceiverFirehoseRegister.unregister(serviceName);
if (chatHandlerProvider.isPresent()) {
chatHandlerProvider.get().unregister(serviceName);
if (!closed) {
log.info("Firehose closing.");
closed = true;
eventReceiverFirehoseRegister.unregister(serviceName);
if (chatHandlerProvider.isPresent()) {
chatHandlerProvider.get().unregister(serviceName);
}
}
}
// public for tests
public void addRows(Iterable<InputRow> rows) throws InterruptedException
{
for (final InputRow row : rows) {
boolean added = false;
while (!closed && !added) {
added = buffer.offer(row, 500, TimeUnit.MILLISECONDS);
}
if (!added) {
throw new IllegalStateException("Cannot add events to closed firehose!");
}
}
}
}