diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java index eb34f4e4c1b..44f3c600f55 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java @@ -20,10 +20,15 @@ package io.druid.indexing.common; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.metrics.MonitorScheduler; import io.druid.client.ServerView; +import io.druid.indexing.common.actions.SegmentInsertAction; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; @@ -37,10 +42,14 @@ import io.druid.segment.loading.SegmentLoader; import io.druid.segment.loading.SegmentLoadingException; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; +import org.joda.time.Interval; import java.io.File; +import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; /** @@ -167,7 +176,7 @@ public class TaskToolbox return objectMapper; } - public Map getSegments(List segments) + public Map fetchSegments(List segments) throws SegmentLoadingException { Map retVal = Maps.newLinkedHashMap(); @@ -178,6 +187,25 @@ public class TaskToolbox return retVal; } + public void pushSegments(Iterable segments) throws IOException { + // Request segment pushes for each set + final Multimap segmentMultimap = Multimaps.index( + segments, + new Function() + { + @Override + public Interval apply(DataSegment segment) + { + return segment.getInterval(); + } + } + ); + for (final Collection segmentCollection : segmentMultimap.asMap().values()) { + getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(segmentCollection))); + } + + } + public File getTaskWorkDir() { return taskWorkDir; diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/LocalTaskActionClient.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/LocalTaskActionClient.java index 8bb23918b01..4dd445df80d 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/LocalTaskActionClient.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/LocalTaskActionClient.java @@ -19,6 +19,7 @@ package io.druid.indexing.common.actions; +import com.metamx.common.ISE; import com.metamx.emitter.EmittingLogger; import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.TaskStorage; @@ -45,21 +46,21 @@ public class LocalTaskActionClient implements TaskActionClient { log.info("Performing action for task[%s]: %s", task.getId(), taskAction); - final RetType ret = taskAction.perform(task, toolbox); - if (taskAction.isAudited()) { // Add audit log try { storage.addAuditLog(task, taskAction); } catch (Exception e) { + final String actionClass = taskAction.getClass().getName(); log.makeAlert(e, "Failed to record action in audit log") .addData("task", task.getId()) - .addData("actionClass", taskAction.getClass().getName()) + .addData("actionClass", actionClass) .emit(); + throw new ISE(e, "Failed to record action [%s] in audit log", actionClass); } } - return ret; + return taskAction.perform(task, toolbox); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentInsertAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentInsertAction.java index aaad73b8a9f..5280e394f6f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentInsertAction.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentInsertAction.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.collect.ImmutableSet; -import com.metamx.common.ISE; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.indexing.common.task.Task; import io.druid.timeline.DataSegment; @@ -80,9 +79,7 @@ public class SegmentInsertAction implements TaskAction> @Override public Set perform(Task task, TaskActionToolbox toolbox) throws IOException { - if(!toolbox.taskLockCoversSegments(task, segments, allowOlderVersions)) { - throw new ISE("Segments not covered by locks for task[%s]: %s", task.getId(), segments); - } + toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true); final Set retVal = toolbox.getIndexerDBCoordinator().announceHistoricalSegments(segments); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentMetadataUpdateAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentMetadataUpdateAction.java index f996a2c6ab0..4356c80dc59 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentMetadataUpdateAction.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentMetadataUpdateAction.java @@ -5,7 +5,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.collect.ImmutableSet; -import com.metamx.common.ISE; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.indexing.common.task.Task; import io.druid.timeline.DataSegment; @@ -42,10 +41,7 @@ public class SegmentMetadataUpdateAction implements TaskAction Task task, TaskActionToolbox toolbox ) throws IOException { - if(!toolbox.taskLockCoversSegments(task, segments, true)) { - throw new ISE("Segments not covered by locks for task: %s", task.getId()); - } - + toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true); toolbox.getIndexerDBCoordinator().updateSegmentMetadata(segments); // Emit metrics diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentNukeAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentNukeAction.java index 6ac8dd1ccc4..54258df1c2d 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentNukeAction.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentNukeAction.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.collect.ImmutableSet; -import com.metamx.common.ISE; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.indexing.common.task.Task; import io.druid.timeline.DataSegment; @@ -59,10 +58,7 @@ public class SegmentNukeAction implements TaskAction @Override public Void perform(Task task, TaskActionToolbox toolbox) throws IOException { - if(!toolbox.taskLockCoversSegments(task, segments, true)) { - throw new ISE("Segments not covered by locks for task: %s", task.getId()); - } - + toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true); toolbox.getIndexerDBCoordinator().deleteSegments(segments); // Emit metrics diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java index a0b41e58a63..10c4d627462 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java @@ -19,9 +19,11 @@ package io.druid.indexing.common.actions; +import com.google.api.client.repackaged.com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.inject.Inject; +import com.metamx.common.ISE; import com.metamx.emitter.service.ServiceEmitter; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.task.Task; @@ -65,6 +67,42 @@ public class TaskActionToolbox return emitter; } + public boolean segmentsAreFromSamePartitionSet( + final Set segments + ) + { + // Verify that these segments are all in the same partition set + + Preconditions.checkArgument(!segments.isEmpty(), "segments nonempty"); + final DataSegment firstSegment = segments.iterator().next(); + for (final DataSegment segment : segments) { + if (!segment.getDataSource().equals(firstSegment.getDataSource())) { + return false; + } + if (!segment.getInterval().equals(firstSegment.getInterval())) { + return false; + } + if (!segment.getVersion().equals(firstSegment.getVersion())) { + return false; + } + } + return true; + } + + public void verifyTaskLocksAndSinglePartitionSettitude( + final Task task, + final Set segments, + final boolean allowOlderVersions + ) + { + if (!taskLockCoversSegments(task, segments, allowOlderVersions)) { + throw new ISE("Segments not covered by locks for task: %s", task.getId()); + } + if (!segmentsAreFromSamePartitionSet(segments)) { + throw new ISE("Segments are not in the same partition set: %s", segments); + } + } + public boolean taskLockCoversSegments( final Task task, final Set segments, diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/DeleteTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/DeleteTask.java index 872ac3507bd..970818a6e9d 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/DeleteTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/DeleteTask.java @@ -22,6 +22,7 @@ package io.druid.indexing.common.task; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -103,7 +104,7 @@ public class DeleteTask extends AbstractFixedIntervalTask segment.getVersion() ); - toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment))); + toolbox.pushSegments(ImmutableList.of(uploadedSegment)); return TaskStatus.success(getId()); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java index 400b088a693..233714f5c71 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java @@ -24,10 +24,14 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; import com.google.api.client.util.Lists; +import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; import com.metamx.common.logger.Logger; import io.druid.common.utils.JodaUtils; import io.druid.indexer.HadoopDruidIndexerConfig; @@ -47,12 +51,15 @@ import io.tesla.aether.internal.DefaultTeslaAether; import org.joda.time.DateTime; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.io.File; import java.lang.reflect.Method; import java.net.URL; import java.net.URLClassLoader; import java.util.Arrays; +import java.util.Collection; import java.util.List; +import java.util.Map; public class HadoopIndexTask extends AbstractFixedIntervalTask { @@ -180,14 +187,10 @@ public class HadoopIndexTask extends AbstractFixedIntervalTask if (segments != null) { List publishedSegments = toolbox.getObjectMapper().readValue( - segments, new TypeReference>() - { - } + segments, + new TypeReference>() {} ); - // Request segment pushes - toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(publishedSegments))); - - // Done + toolbox.pushSegments(publishedSegments); return TaskStatus.success(getId()); } else { return TaskStatus.failure(getId()); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 19cb791b77e..6e09e46373c 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -156,7 +156,7 @@ public class IndexTask extends AbstractFixedIntervalTask segments.add(segment); } } - toolbox.getTaskActionClient().submit(new SegmentInsertAction(segments)); + toolbox.pushSegments(segments); return TaskStatus.success(getId()); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/KillTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/KillTask.java index c77ddb21d96..b4858342981 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/KillTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/KillTask.java @@ -97,11 +97,9 @@ public class KillTask extends AbstractFixedIntervalTask // Kill segments for (DataSegment segment : unusedSegments) { toolbox.getDataSegmentKiller().kill(segment); + toolbox.getTaskActionClient().submit(new SegmentNukeAction(ImmutableSet.of(segment))); } - // Remove metadata for these segments - toolbox.getTaskActionClient().submit(new SegmentNukeAction(ImmutableSet.copyOf(unusedSegments))); - return TaskStatus.success(getId()); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java index d49d74b355b..40b07f72d71 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java @@ -27,6 +27,7 @@ import com.google.common.base.Joiner; import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -142,7 +143,7 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask ); // download segments to merge - final Map gettedSegments = toolbox.getSegments(segments); + final Map gettedSegments = toolbox.fetchSegments(segments); // merge files together final File fileToUpload = merge(gettedSegments, new File(taskDir, "merged")); @@ -165,7 +166,7 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask emitter.emit(builder.build("merger/uploadTime", System.currentTimeMillis() - uploadStart)); emitter.emit(builder.build("merger/mergeSize", uploadedSegment.getSize())); - toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment))); + toolbox.pushSegments(ImmutableList.of(uploadedSegment)); return TaskStatus.success(getId()); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java index 8e628b93188..da82ffa6608 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java @@ -98,18 +98,12 @@ public class MoveTask extends AbstractFixedIntervalTask log.info("OK to move segment: %s", unusedSegment.getIdentifier()); } - List movedSegments = Lists.newLinkedList(); - // Move segments for (DataSegment segment : unusedSegments) { - movedSegments.add(toolbox.getDataSegmentMover().move(segment, targetLoadSpec)); + final DataSegment movedSegment = toolbox.getDataSegmentMover().move(segment, targetLoadSpec); + toolbox.getTaskActionClient().submit(new SegmentMetadataUpdateAction(ImmutableSet.of(movedSegment))); } - // Update metadata for moved segments - toolbox.getTaskActionClient().submit(new SegmentMetadataUpdateAction( - ImmutableSet.copyOf(movedSegments) - )); - return TaskStatus.success(getId()); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index f28d83c1caf..6ddc523140b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -418,7 +418,7 @@ public class RealtimeIndexTask extends AbstractTask @Override public void publishSegment(DataSegment segment) throws IOException { - taskToolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment))); + taskToolbox.pushSegments(ImmutableList.of(segment)); } } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/VersionConverterTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/VersionConverterTask.java index 52807861a12..75561f2408e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/VersionConverterTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/VersionConverterTask.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Function; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.logger.Logger; @@ -248,7 +249,7 @@ public class VersionConverterTask extends AbstractFixedIntervalTask } } - final Map localSegments = toolbox.getSegments(Arrays.asList(segment)); + final Map localSegments = toolbox.fetchSegments(Arrays.asList(segment)); final File location = localSegments.get(segment); final File outLocation = new File(location, "v9_out");