mirror of https://github.com/apache/druid.git
CliPeon: Fix local mode
This commit is contained in:
parent
370e2f855a
commit
70c153592f
|
@ -100,6 +100,8 @@ public interface Task
|
|||
* actions must be idempotent, since this method may be executed multiple times. This typically runs on the
|
||||
* coordinator. If this method throws an exception, the task should be considered a failure.
|
||||
*
|
||||
* This method must be idempotent, as it may be run multiple times per task.
|
||||
*
|
||||
* @param taskActionClient action client for this task (not the full toolbox)
|
||||
*
|
||||
* @return true if ready, false if not ready yet
|
||||
|
|
|
@ -20,16 +20,19 @@
|
|||
package io.druid.indexing.worker.executor;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import io.druid.concurrent.Execs;
|
||||
import io.druid.indexing.common.TaskStatus;
|
||||
import io.druid.indexing.common.actions.TaskActionClientFactory;
|
||||
import io.druid.indexing.common.task.Task;
|
||||
import io.druid.indexing.overlord.TaskRunner;
|
||||
|
||||
|
@ -47,6 +50,7 @@ public class ExecutorLifecycle
|
|||
private static final EmittingLogger log = new EmittingLogger(ExecutorLifecycle.class);
|
||||
|
||||
private final ExecutorLifecycleConfig config;
|
||||
private final TaskActionClientFactory taskActionClientFactory;
|
||||
private final TaskRunner taskRunner;
|
||||
private final ObjectMapper jsonMapper;
|
||||
|
||||
|
@ -57,11 +61,13 @@ public class ExecutorLifecycle
|
|||
@Inject
|
||||
public ExecutorLifecycle(
|
||||
ExecutorLifecycleConfig config,
|
||||
TaskActionClientFactory taskActionClientFactory,
|
||||
TaskRunner taskRunner,
|
||||
ObjectMapper jsonMapper
|
||||
)
|
||||
{
|
||||
this.config = config;
|
||||
this.taskActionClientFactory = taskActionClientFactory;
|
||||
this.taskRunner = taskRunner;
|
||||
this.jsonMapper = jsonMapper;
|
||||
}
|
||||
|
@ -69,9 +75,9 @@ public class ExecutorLifecycle
|
|||
@LifecycleStart
|
||||
public void start()
|
||||
{
|
||||
final File taskFile = config.getTaskFile();
|
||||
final File statusFile = config.getStatusFile();
|
||||
final InputStream parentStream = config.getParentStream();
|
||||
final File taskFile = Preconditions.checkNotNull(config.getTaskFile(), "taskFile");
|
||||
final File statusFile = Preconditions.checkNotNull(config.getStatusFile(), "statusFile");
|
||||
final InputStream parentStream = Preconditions.checkNotNull(config.getParentStream(), "parentStream");
|
||||
|
||||
final Task task;
|
||||
|
||||
|
@ -111,28 +117,41 @@ public class ExecutorLifecycle
|
|||
}
|
||||
);
|
||||
|
||||
statusFuture = Futures.transform(
|
||||
taskRunner.run(task), new Function<TaskStatus, TaskStatus>()
|
||||
{
|
||||
@Override
|
||||
public TaskStatus apply(TaskStatus taskStatus)
|
||||
{
|
||||
try {
|
||||
log.info(
|
||||
"Task completed with status: %s",
|
||||
jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(taskStatus)
|
||||
);
|
||||
|
||||
statusFile.getParentFile().mkdirs();
|
||||
jsonMapper.writeValue(statusFile, taskStatus);
|
||||
|
||||
return taskStatus;
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
// Won't hurt in remote mode, and is required for setting up locks in local mode:
|
||||
try {
|
||||
if (!task.isReady(taskActionClientFactory.create(task))) {
|
||||
throw new ISE("Task is not ready to run yet!", task.getId());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new ISE(e, "Failed to run isReady", task.getId());
|
||||
}
|
||||
|
||||
statusFuture = Futures.transform(
|
||||
taskRunner.run(task),
|
||||
new Function<TaskStatus, TaskStatus>()
|
||||
{
|
||||
@Override
|
||||
public TaskStatus apply(TaskStatus taskStatus)
|
||||
{
|
||||
try {
|
||||
log.info(
|
||||
"Task completed with status: %s",
|
||||
jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(taskStatus)
|
||||
);
|
||||
|
||||
final File statusFileParent = statusFile.getParentFile();
|
||||
if (statusFileParent != null) {
|
||||
statusFileParent.mkdirs();
|
||||
}
|
||||
jsonMapper.writeValue(statusFile, taskStatus);
|
||||
|
||||
return taskStatus;
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -53,7 +53,6 @@ import io.druid.indexing.common.index.NoopChatHandlerProvider;
|
|||
import io.druid.indexing.common.index.ServiceAnnouncingChatHandlerProvider;
|
||||
import io.druid.indexing.overlord.HeapMemoryTaskStorage;
|
||||
import io.druid.indexing.overlord.IndexerDBCoordinator;
|
||||
import io.druid.indexing.overlord.TaskQueue;
|
||||
import io.druid.indexing.overlord.TaskRunner;
|
||||
import io.druid.indexing.overlord.TaskStorage;
|
||||
import io.druid.indexing.overlord.ThreadPoolTaskRunner;
|
||||
|
|
Loading…
Reference in New Issue