Index service: Fix kill task

This commit is contained in:
Gian Merlino 2013-01-25 13:15:49 -08:00
parent 553738e1d8
commit 82d77a8b72
3 changed files with 36 additions and 24 deletions

View File

@ -56,6 +56,6 @@ public class KillTask extends AbstractTask
.kill(getDataSource(), getInterval()) .kill(getDataSource(), getInterval())
); );
return TaskStatus.success(getId(), segmentsToKill); return TaskStatus.success(getId()).withSegmentsNuked(segmentsToKill);
} }
} }

View File

@ -39,7 +39,7 @@ public class LocalTaskRunner implements TaskRunner
private final TaskToolbox toolbox; private final TaskToolbox toolbox;
private final ExecutorService exec; private final ExecutorService exec;
private static final Logger log = new Logger(TaskQueue.class); private static final Logger log = new Logger(LocalTaskRunner.class);
public LocalTaskRunner( public LocalTaskRunner(
TaskToolbox toolbox, TaskToolbox toolbox,
@ -66,11 +66,11 @@ public class LocalTaskRunner implements TaskRunner
public void run() public void run()
{ {
final long startTime = System.currentTimeMillis(); final long startTime = System.currentTimeMillis();
final File taskDir = toolbox.getConfig().getTaskDir(task);
TaskStatus status; TaskStatus status;
try { try {
log.info("Running task: %s", task.getId());
status = task.run(context, toolbox, callback); status = task.run(context, toolbox, callback);
} }
catch (InterruptedException e) { catch (InterruptedException e) {
@ -87,13 +87,15 @@ public class LocalTaskRunner implements TaskRunner
} }
try { try {
final File taskDir = toolbox.getConfig().getTaskDir(task);
if (taskDir.exists()) { if (taskDir.exists()) {
log.info("Removing task directory: %s", taskDir); log.info("Removing task directory: %s", taskDir);
FileUtils.deleteDirectory(taskDir); FileUtils.deleteDirectory(taskDir);
} }
} }
catch (Exception e) { catch (Exception e) {
log.error(e, "Failed to delete task directory[%s]", taskDir.toString()); log.error(e, "Failed to delete task directory: %s", task.getId());
} }
try { try {

View File

@ -21,14 +21,13 @@ package com.metamx.druid.merger.coordinator.exec;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.metamx.common.ISE;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.MergerDBCoordinator; import com.metamx.druid.merger.coordinator.MergerDBCoordinator;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.coordinator.TaskContext; import com.metamx.druid.merger.coordinator.TaskContext;
import com.metamx.druid.merger.coordinator.TaskQueue; import com.metamx.druid.merger.coordinator.TaskQueue;
import com.metamx.druid.merger.coordinator.TaskRunner; import com.metamx.druid.merger.coordinator.TaskRunner;
@ -37,7 +36,6 @@ import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import java.util.List;
import java.util.Set; import java.util.Set;
public class TaskConsumer implements Runnable public class TaskConsumer implements Runnable
@ -96,7 +94,7 @@ public class TaskConsumer implements Runnable
} }
catch (InterruptedException e) { catch (InterruptedException e) {
log.info(e, "Interrupted while waiting for new work"); log.info(e, "Interrupted while waiting for new work");
throw Throwables.propagate(e); throw e;
} }
try { try {
@ -117,10 +115,10 @@ public class TaskConsumer implements Runnable
} }
} }
} }
catch (Throwable t) { catch (Exception e) {
// exit thread // exit thread
log.error(t, "Uncaught Throwable while consuming tasks"); log.error(e, "Uncaught exception while consuming tasks");
throw Throwables.propagate(t); throw Throwables.propagate(e);
} }
} }
@ -242,7 +240,18 @@ public class TaskConsumer implements Runnable
private void deleteSegments(Task task, TaskContext context, Set<DataSegment> segments) throws Exception private void deleteSegments(Task task, TaskContext context, Set<DataSegment> segments) throws Exception
{ {
for (DataSegment segment : segments) { for (DataSegment segment : segments) {
verifySegment(task, context, segment); verifyDataSourceAndInterval(task, context, segment);
// Verify version (must be less than our context version)
if (segment.getVersion().compareTo(context.getVersion()) >= 0) {
throw new IllegalStateException(
String.format(
"Segment-to-nuke for task[%s] has invalid version: %s",
task.getId(),
segment.getIdentifier()
)
);
}
log.info("Deleting segment[%s] for task[%s]", segment.getIdentifier(), task.getId()); log.info("Deleting segment[%s] for task[%s]", segment.getIdentifier(), task.getId());
mergerDBCoordinator.deleteSegment(segment); mergerDBCoordinator.deleteSegment(segment);
@ -252,14 +261,25 @@ public class TaskConsumer implements Runnable
private void publishSegments(Task task, TaskContext context, Set<DataSegment> segments) throws Exception private void publishSegments(Task task, TaskContext context, Set<DataSegment> segments) throws Exception
{ {
for (DataSegment segment : segments) { for (DataSegment segment : segments) {
verifySegment(task, context, segment); verifyDataSourceAndInterval(task, context, segment);
// Verify version (must be equal to our context version)
if (!context.getVersion().equals(segment.getVersion())) {
throw new IllegalStateException(
String.format(
"Segment for task[%s] has invalid version: %s",
task.getId(),
segment.getIdentifier()
)
);
}
log.info("Publishing segment[%s] for task[%s]", segment.getIdentifier(), task.getId()); log.info("Publishing segment[%s] for task[%s]", segment.getIdentifier(), task.getId());
mergerDBCoordinator.announceHistoricalSegment(segment); mergerDBCoordinator.announceHistoricalSegment(segment);
} }
} }
private void verifySegment(Task task, TaskContext context, DataSegment segment) private void verifyDataSourceAndInterval(Task task, TaskContext context, DataSegment segment)
{ {
if (!task.getDataSource().equals(segment.getDataSource())) { if (!task.getDataSource().equals(segment.getDataSource())) {
throw new IllegalStateException( throw new IllegalStateException(
@ -280,15 +300,5 @@ public class TaskConsumer implements Runnable
) )
); );
} }
if (!context.getVersion().equals(segment.getVersion())) {
throw new IllegalStateException(
String.format(
"Segment for task[%s] has invalid version: %s",
task.getId(),
segment.getIdentifier()
)
);
}
} }
} }