From 726326abc3dbbfcbe20362398f6c5c406f4ac34b Mon Sep 17 00:00:00 2001 From: Nishant Date: Mon, 3 Aug 2015 23:29:34 +0530 Subject: [PATCH] Add Task Context and ability to override task specific properties override javaOpts fix compilation review comments Add Test for typecast review comments - remove unused method. --- .../task/AbstractFixedIntervalTask.java | 22 ++++++---- .../indexing/common/task/AbstractTask.java | 28 ++++++++++--- .../indexing/common/task/AppendTask.java | 5 ++- .../indexing/common/task/ArchiveTask.java | 7 +++- ...ConvertSegmentBackwardsCompatibleTask.java | 5 ++- .../common/task/ConvertSegmentTask.java | 37 +++++++++------- .../common/task/HadoopConverterTask.java | 18 +++++--- .../indexing/common/task/HadoopIndexTask.java | 7 +++- .../indexing/common/task/HadoopTask.java | 10 ++++- .../druid/indexing/common/task/IndexTask.java | 7 +++- .../druid/indexing/common/task/KillTask.java | 7 +++- .../druid/indexing/common/task/MergeTask.java | 6 ++- .../indexing/common/task/MergeTaskBase.java | 36 +++++++++------- .../druid/indexing/common/task/MoveTask.java | 6 ++- .../druid/indexing/common/task/NoopTask.java | 9 ++-- .../common/task/RealtimeIndexTask.java | 7 +++- .../indexing/common/task/RestoreTask.java | 7 +++- .../io/druid/indexing/common/task/Task.java | 6 +++ .../IngestSegmentFirehoseFactory.java | 2 +- .../indexing/overlord/ForkingTaskRunner.java | 26 ++++++++++++ .../druid/indexing/common/TestMergeTask.java | 2 +- .../indexing/common/TestRealtimeTask.java | 3 +- .../common/task/ConvertSegmentTaskTest.java | 5 ++- .../task/HadoopConverterTaskSerDeTest.java | 28 +++++++++---- .../indexing/common/task/IndexTaskTest.java | 9 ++-- .../common/task/MergeTaskBaseTest.java | 2 +- .../indexing/common/task/TaskSerdeTest.java | 42 ++++++++++++------- .../indexing/overlord/RealtimeishTask.java | 4 +- .../indexing/overlord/TaskLifecycleTest.java | 21 ++++++---- .../overlord/http/OverlordResourceTest.java | 4 +- ...lDistributionWorkerSelectStrategyTest.java | 2 +- ...yWithAffinityWorkerSelectStrategyTest.java | 6 +-- .../indexing/worker/TaskAnnouncementTest.java | 12 ++++-- 33 files changed, 275 insertions(+), 123 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractFixedIntervalTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractFixedIntervalTask.java index 00310f71ae6..b36ff75cd4d 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractFixedIntervalTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractFixedIntervalTask.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import io.druid.indexing.common.actions.LockTryAcquireAction; import io.druid.indexing.common.actions.TaskActionClient; +import java.util.Map; import org.joda.time.Interval; public abstract class AbstractFixedIntervalTask extends AbstractTask @@ -32,17 +33,19 @@ public abstract class AbstractFixedIntervalTask extends AbstractTask protected AbstractFixedIntervalTask( String id, String dataSource, - Interval interval + Interval interval, + Map context ) { - this(id, id, new TaskResource(id, 1), dataSource, interval); + this(id, id, new TaskResource(id, 1), dataSource, interval, context); } protected AbstractFixedIntervalTask( String id, TaskResource taskResource, String dataSource, - Interval interval + Interval interval, + Map context ) { this( @@ -50,7 +53,8 @@ public abstract class AbstractFixedIntervalTask extends AbstractTask id, taskResource == null ? new TaskResource(id, 1) : taskResource, dataSource, - interval + interval, + context ); } @@ -58,10 +62,11 @@ public abstract class AbstractFixedIntervalTask extends AbstractTask String id, String groupId, String dataSource, - Interval interval + Interval interval, + Map context ) { - this(id, groupId, new TaskResource(id, 1), dataSource, interval); + this(id, groupId, new TaskResource(id, 1), dataSource, interval, context); } protected AbstractFixedIntervalTask( @@ -69,10 +74,11 @@ public abstract class AbstractFixedIntervalTask extends AbstractTask String groupId, TaskResource taskResource, String dataSource, - Interval interval + Interval interval, + Map context ) { - super(id, groupId, taskResource, dataSource); + super(id, groupId, taskResource, dataSource, context); this.interval = Preconditions.checkNotNull(interval, "interval"); Preconditions.checkArgument(interval.toDurationMillis() > 0, "interval empty"); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java index ada1265c88f..3d3df6b363b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java @@ -28,6 +28,7 @@ import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.LockListAction; import io.druid.query.Query; import io.druid.query.QueryRunner; +import java.util.Map; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -49,22 +50,25 @@ public abstract class AbstractTask implements Task @JsonIgnore private final String dataSource; - protected AbstractTask(String id, String dataSource) + private final Map context; + + protected AbstractTask(String id, String dataSource, Map context) { - this(id, id, new TaskResource(id, 1), dataSource); + this(id, id, new TaskResource(id, 1), dataSource, context); } - protected AbstractTask(String id, String groupId, String dataSource) + protected AbstractTask(String id, String groupId, String dataSource, Map context) { - this(id, groupId, new TaskResource(id, 1), dataSource); + this(id, groupId, new TaskResource(id, 1), dataSource, context); } - protected AbstractTask(String id, String groupId, TaskResource taskResource, String dataSource) + protected AbstractTask(String id, String groupId, TaskResource taskResource, String dataSource, Map context) { this.id = Preconditions.checkNotNull(id, "id"); this.groupId = Preconditions.checkNotNull(groupId, "groupId"); this.taskResource = Preconditions.checkNotNull(taskResource, "resource"); this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); + this.context = context; } public static String makeId(String id, final String typeName, String dataSource, Interval interval) @@ -179,4 +183,18 @@ public abstract class AbstractTask implements Task { return toolbox.getTaskActionClient().submit(new LockListAction()); } + + @Override + @JsonProperty + public Map getContext() + { + return context; + } + + @Override + public Object getContextValue(String key) + { + return context == null ? null : context.get(key); + } + } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AppendTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AppendTask.java index 164b282f45b..086a71c5b8a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AppendTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AppendTask.java @@ -55,10 +55,11 @@ public class AppendTask extends MergeTaskBase @JsonProperty("id") String id, @JsonProperty("dataSource") String dataSource, @JsonProperty("segments") List segments, - @JsonProperty("indexSpec") IndexSpec indexSpec + @JsonProperty("indexSpec") IndexSpec indexSpec, + @JsonProperty("context") Map context ) { - super(id, dataSource, segments); + super(id, dataSource, segments, context); this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec; } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/ArchiveTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/ArchiveTask.java index 9c42a87a4b0..fa0435c2b0f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/ArchiveTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/ArchiveTask.java @@ -28,6 +28,7 @@ import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.SegmentListUnusedAction; import io.druid.indexing.common.actions.SegmentMetadataUpdateAction; import io.druid.timeline.DataSegment; +import java.util.Map; import org.joda.time.Interval; import java.util.List; @@ -39,13 +40,15 @@ public class ArchiveTask extends AbstractFixedIntervalTask public ArchiveTask( @JsonProperty("id") String id, @JsonProperty("dataSource") String dataSource, - @JsonProperty("interval") Interval interval + @JsonProperty("interval") Interval interval, + @JsonProperty("context") Map context ) { super( makeId(id, "archive", dataSource, interval), dataSource, - interval + interval, + context ); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentBackwardsCompatibleTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentBackwardsCompatibleTask.java index 165e2f0fb46..9fada765b5c 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentBackwardsCompatibleTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentBackwardsCompatibleTask.java @@ -27,7 +27,8 @@ public class ConvertSegmentBackwardsCompatibleTask extends ConvertSegmentTask segment, indexSpec, force == null ? false : force, - validate ==null ? false : validate + validate ==null ? false : validate, + null ); } @@ -43,7 +44,7 @@ public class ConvertSegmentBackwardsCompatibleTask extends ConvertSegmentTask @JsonProperty("validate") Boolean validate ) { - super(groupId, segment, indexSpec, force, validate); + super(groupId, segment, indexSpec, force, validate, null); } } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java index 883a776f099..46cea2bd564 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java @@ -81,11 +81,12 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask Interval interval, IndexSpec indexSpec, boolean force, - boolean validate + boolean validate, + Map context ) { final String id = makeId(dataSource, interval); - return new ConvertSegmentTask(id, dataSource, interval, null, indexSpec, force, validate); + return new ConvertSegmentTask(id, dataSource, interval, null, indexSpec, force, validate, context); } /** @@ -98,12 +99,13 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask * * @return A SegmentConverterTask for the segment with the indexSpec specified. */ - public static ConvertSegmentTask create(DataSegment segment, IndexSpec indexSpec, boolean force, boolean validate) + public static ConvertSegmentTask create(DataSegment segment, IndexSpec indexSpec, boolean force, boolean validate, Map context + ) { final Interval interval = segment.getInterval(); final String dataSource = segment.getDataSource(); final String id = makeId(dataSource, interval); - return new ConvertSegmentTask(id, dataSource, interval, segment, indexSpec, force, validate); + return new ConvertSegmentTask(id, dataSource, interval, segment, indexSpec, force, validate, context); } protected static String makeId(String dataSource, Interval interval) @@ -121,19 +123,20 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask @JsonProperty("segment") DataSegment segment, @JsonProperty("indexSpec") IndexSpec indexSpec, @JsonProperty("force") Boolean force, - @JsonProperty("validate") Boolean validate + @JsonProperty("validate") Boolean validate, + @JsonProperty("context") Map context ) { final boolean isForce = force == null ? false : force; final boolean isValidate = validate == null ? true : validate; if (id == null) { if (segment == null) { - return create(dataSource, interval, indexSpec, isForce, isValidate); + return create(dataSource, interval, indexSpec, isForce, isValidate, context); } else { - return create(segment, indexSpec, isForce, isValidate); + return create(segment, indexSpec, isForce, isValidate, context); } } - return new ConvertSegmentTask(id, dataSource, interval, segment, indexSpec, isForce, isValidate); + return new ConvertSegmentTask(id, dataSource, interval, segment, indexSpec, isForce, isValidate, context); } protected ConvertSegmentTask( @@ -143,10 +146,11 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask DataSegment segment, IndexSpec indexSpec, boolean force, - boolean validate + boolean validate, + Map context ) { - super(id, dataSource, interval); + super(id, dataSource, interval, context); this.segment = segment; this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec; this.force = force; @@ -224,7 +228,7 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask segmentsToUpdate = Collections.singleton(segment); } // Vestigial from a past time when this task spawned subtasks. - for (final Task subTask : generateSubTasks(getGroupId(), segmentsToUpdate, indexSpec, force, validate)) { + for (final Task subTask : generateSubTasks(getGroupId(), segmentsToUpdate, indexSpec, force, validate, getContext())) { final TaskStatus status = subTask.run(toolbox); if (!status.isSuccess()) { return TaskStatus.fromCode(getId(), status.getStatusCode()); @@ -238,7 +242,8 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask final Iterable segments, final IndexSpec indexSpec, final boolean force, - final boolean validate + final boolean validate, + final Map context ) { return Iterables.transform( @@ -248,7 +253,7 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask @Override public Task apply(DataSegment input) { - return new SubTask(groupId, input, indexSpec, force, validate); + return new SubTask(groupId, input, indexSpec, force, validate, context); } } ); @@ -287,7 +292,8 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask @JsonProperty("segment") DataSegment segment, @JsonProperty("indexSpec") IndexSpec indexSpec, @JsonProperty("force") Boolean force, - @JsonProperty("validate") Boolean validate + @JsonProperty("validate") Boolean validate, + @JsonProperty("context") Map context ) { super( @@ -300,7 +306,8 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask ), groupId, segment.getDataSource(), - segment.getInterval() + segment.getInterval(), + context ); this.segment = segment; this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec; diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopConverterTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopConverterTask.java index ec543361703..777a055fdf0 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopConverterTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopConverterTask.java @@ -63,7 +63,8 @@ public class HadoopConverterTask extends ConvertSegmentTask @JsonProperty("distributedSuccessCache") URI distributedSuccessCache, @JsonProperty("jobPriority") String jobPriority, @JsonProperty("segmentOutputPath") String segmentOutputPath, - @JsonProperty("classpathPrefix") String classpathPrefix + @JsonProperty("classpathPrefix") String classpathPrefix, + @JsonProperty("context") Map context ) { super( @@ -78,7 +79,8 @@ public class HadoopConverterTask extends ConvertSegmentTask null, // Always call subtask codepath indexSpec, force, - validate == null ? true : validate + validate == null ? true : validate, + context ); this.hadoopDependencyCoordinates = hadoopDependencyCoordinates; this.distributedSuccessCache = Preconditions.checkNotNull(distributedSuccessCache, "distributedSuccessCache"); @@ -130,13 +132,15 @@ public class HadoopConverterTask extends ConvertSegmentTask final Iterable segments, final IndexSpec indexSpec, final boolean force, - final boolean validate + final boolean validate, + Map context ) { return Collections.singleton( new ConverterSubTask( ImmutableList.copyOf(segments), - this + this, + context ) ); } @@ -164,7 +168,8 @@ public class HadoopConverterTask extends ConvertSegmentTask @JsonCreator public ConverterSubTask( @JsonProperty("segments") List segments, - @JsonProperty("parent") HadoopConverterTask parent + @JsonProperty("parent") HadoopConverterTask parent, + @JsonProperty("context") Map context ) { super( @@ -175,7 +180,8 @@ public class HadoopConverterTask extends ConvertSegmentTask parent.getInterval().getEnd() ), parent.getDataSource(), - parent.getHadoopDependencyCoordinates() + parent.getHadoopDependencyCoordinates(), + context ); this.segments = segments; this.parent = parent; diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java index e865842ce99..f4f8072e226 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java @@ -45,6 +45,7 @@ import io.druid.indexing.common.actions.SegmentListUsedAction; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.hadoop.OverlordActionBasedUsedSegmentLister; import io.druid.timeline.DataSegment; +import java.util.Map; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -88,7 +89,8 @@ public class HadoopIndexTask extends HadoopTask @JsonProperty("hadoopCoordinates") String hadoopCoordinates, @JsonProperty("hadoopDependencyCoordinates") List hadoopDependencyCoordinates, @JsonProperty("classpathPrefix") String classpathPrefix, - @JacksonInject ObjectMapper jsonMapper + @JacksonInject ObjectMapper jsonMapper, + @JsonProperty("context") Map context ) { super( @@ -96,7 +98,8 @@ public class HadoopIndexTask extends HadoopTask getTheDataSource(spec), hadoopDependencyCoordinates == null ? (hadoopCoordinates == null ? null : ImmutableList.of(hadoopCoordinates)) - : hadoopDependencyCoordinates + : hadoopDependencyCoordinates, + context ); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopTask.java index 463d2fa61c5..82efbb06be4 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopTask.java @@ -38,6 +38,7 @@ import java.net.URL; import java.net.URLClassLoader; import java.util.Arrays; import java.util.List; +import java.util.Map; public abstract class HadoopTask extends AbstractTask @@ -53,9 +54,14 @@ public abstract class HadoopTask extends AbstractTask private final List hadoopDependencyCoordinates; - protected HadoopTask(String id, String dataSource, List hadoopDependencyCoordinates) + protected HadoopTask( + String id, + String dataSource, + List hadoopDependencyCoordinates, + Map context + ) { - super(id, dataSource); + super(id, dataSource, context); this.hadoopDependencyCoordinates = hadoopDependencyCoordinates; } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index e353f93ada8..a04a9b8956e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -58,6 +58,7 @@ import io.druid.timeline.DataSegment; import io.druid.timeline.partition.HashBasedNumberedShardSpec; import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.ShardSpec; +import java.util.Map; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -146,7 +147,8 @@ public class IndexTask extends AbstractFixedIntervalTask @JsonProperty("id") String id, @JsonProperty("resource") TaskResource taskResource, @JsonProperty("spec") IndexIngestionSpec ingestionSchema, - @JacksonInject ObjectMapper jsonMapper + @JacksonInject ObjectMapper jsonMapper, + @JsonProperty("context") Map context ) { super( @@ -154,7 +156,8 @@ public class IndexTask extends AbstractFixedIntervalTask makeId(id, ingestionSchema), taskResource, makeDataSource(ingestionSchema), - makeInterval(ingestionSchema) + makeInterval(ingestionSchema), + context ); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/KillTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/KillTask.java index 224648649d4..0a04e325f7d 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/KillTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/KillTask.java @@ -29,6 +29,7 @@ import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.SegmentListUnusedAction; import io.druid.indexing.common.actions.SegmentNukeAction; import io.druid.timeline.DataSegment; +import java.util.Map; import org.joda.time.Interval; import java.util.List; @@ -43,13 +44,15 @@ public class KillTask extends AbstractFixedIntervalTask public KillTask( @JsonProperty("id") String id, @JsonProperty("dataSource") String dataSource, - @JsonProperty("interval") Interval interval + @JsonProperty("interval") Interval interval, + @JsonProperty("context") Map context ) { super( makeId(id, "kill", dataSource, interval), dataSource, - interval + interval, + context ); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTask.java index 6b2d264d292..543accff499 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTask.java @@ -50,10 +50,12 @@ public class MergeTask extends MergeTaskBase @JsonProperty("dataSource") String dataSource, @JsonProperty("segments") List segments, @JsonProperty("aggregations") List aggregators, - @JsonProperty("indexSpec") IndexSpec indexSpec + @JsonProperty("indexSpec") IndexSpec indexSpec, + @JsonProperty("context") Map context + ) { - super(id, dataSource, segments); + super(id, dataSource, segments, context); this.aggregators = aggregators; this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec; } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java index a554c48bd38..7338d2f0a5a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java @@ -62,7 +62,12 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask private static final EmittingLogger log = new EmittingLogger(MergeTaskBase.class); - protected MergeTaskBase(final String id, final String dataSource, final List segments) + protected MergeTaskBase( + final String id, + final String dataSource, + final List segments, + Map context + ) { super( // _not_ the version, just something uniqueish @@ -70,7 +75,8 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask "merge_%s_%s", computeProcessingID(dataSource, segments), new DateTime().toString() ), dataSource, - computeMergedInterval(segments) + computeMergedInterval(segments), + context ); // Verify segment list is nonempty @@ -249,19 +255,19 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask final String segmentIDs = Joiner.on("_").join( Iterables.transform( Ordering.natural().sortedCopy(segments), new Function() - { - @Override - public String apply(DataSegment x) - { - return String.format( - "%s_%s_%s_%s", - x.getInterval().getStart(), - x.getInterval().getEnd(), - x.getVersion(), - x.getShardSpec().getPartitionNum() - ); - } - } + { + @Override + public String apply(DataSegment x) + { + return String.format( + "%s_%s_%s_%s", + x.getInterval().getStart(), + x.getInterval().getEnd(), + x.getVersion(), + x.getShardSpec().getPartitionNum() + ); + } + } ) ); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java index dc1cb2e9654..5295ffbec85 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java @@ -45,13 +45,15 @@ public class MoveTask extends AbstractFixedIntervalTask @JsonProperty("id") String id, @JsonProperty("dataSource") String dataSource, @JsonProperty("interval") Interval interval, - @JsonProperty("target") Map targetLoadSpec + @JsonProperty("target") Map targetLoadSpec, + @JsonProperty("context") Map context ) { super( makeId(id, "move", dataSource, interval), dataSource, - interval + interval, + context ); this.targetLoadSpec = targetLoadSpec; } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java index 3b0debf2c13..a783a659b7f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java @@ -26,6 +26,7 @@ import io.druid.data.input.FirehoseFactory; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.TaskActionClient; +import java.util.Map; import org.joda.time.DateTime; import java.util.UUID; @@ -64,12 +65,14 @@ public class NoopTask extends AbstractTask @JsonProperty("runTime") long runTime, @JsonProperty("isReadyTime") long isReadyTime, @JsonProperty("isReadyResult") String isReadyResult, - @JsonProperty("firehose") FirehoseFactory firehoseFactory + @JsonProperty("firehose") FirehoseFactory firehoseFactory, + @JsonProperty("context") Map context ) { super( id == null ? String.format("noop_%s_%s", new DateTime(), UUID.randomUUID().toString()) : id, - "none" + "none", + context ); this.runTime = (runTime == 0) ? defaultRunTime : runTime; @@ -142,6 +145,6 @@ public class NoopTask extends AbstractTask public static NoopTask create() { - return new NoopTask(null, 0, 0, null, null); + return new NoopTask(null, 0, 0, null, null, null); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index a56da1a700d..30474d1d332 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -52,6 +52,7 @@ import io.druid.segment.realtime.plumber.Sink; import io.druid.segment.realtime.plumber.VersioningPolicy; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; +import java.util.Map; import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.Period; @@ -108,14 +109,16 @@ public class RealtimeIndexTask extends AbstractTask public RealtimeIndexTask( @JsonProperty("id") String id, @JsonProperty("resource") TaskResource taskResource, - @JsonProperty("spec") FireDepartment fireDepartment + @JsonProperty("spec") FireDepartment fireDepartment, + @JsonProperty("context") Map context ) { super( id == null ? makeTaskId(fireDepartment) : id, String.format("index_realtime_%s", makeDatasource(fireDepartment)), taskResource == null ? new TaskResource(makeTaskId(fireDepartment), 1) : taskResource, - makeDatasource(fireDepartment) + makeDatasource(fireDepartment), + context ); this.spec = fireDepartment; } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RestoreTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RestoreTask.java index 1d1ba7d4487..0d9b9c8e480 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RestoreTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RestoreTask.java @@ -29,6 +29,7 @@ import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.SegmentListUnusedAction; import io.druid.indexing.common.actions.SegmentMetadataUpdateAction; import io.druid.timeline.DataSegment; +import java.util.Map; import org.joda.time.Interval; import java.util.List; @@ -40,13 +41,15 @@ public class RestoreTask extends AbstractFixedIntervalTask public RestoreTask( @JsonProperty("id") String id, @JsonProperty("dataSource") String dataSource, - @JsonProperty("interval") Interval interval + @JsonProperty("interval") Interval interval, + @JsonProperty("context") Map context ) { super( makeId(id, "restore", dataSource, interval), dataSource, - interval + interval, + context ); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java index cc140a90380..2f3e472b76f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java @@ -24,6 +24,7 @@ import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.query.Query; import io.druid.query.QueryRunner; +import java.util.Map; /** * Represents a task that can run on a worker. The general contracts surrounding Tasks are: @@ -138,4 +139,9 @@ public interface Task * @throws Exception if this task failed */ public TaskStatus run(TaskToolbox toolbox) throws Exception; + + public Map getContext(); + + public Object getContextValue(String key); + } diff --git a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index 75e17299062..aef5c5c3475 100644 --- a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -123,7 +123,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory segments = runTask(indexTask); @@ -182,7 +183,8 @@ public class IndexTaskTest ), null ), - new DefaultObjectMapper() + new DefaultObjectMapper(), + null ); List segments = runTask(indexTask); @@ -290,7 +292,8 @@ public class IndexTaskTest ), null ), - new DefaultObjectMapper() + new DefaultObjectMapper(), + null ); final List segments = runTask(indexTask); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/MergeTaskBaseTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/MergeTaskBaseTest.java index 093852456bb..183e59744ed 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/MergeTaskBaseTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/MergeTaskBaseTest.java @@ -41,7 +41,7 @@ public class MergeTaskBaseTest .add(segmentBuilder.interval(new Interval("2012-01-03/2012-01-05")).build()) .build(); - final MergeTaskBase testMergeTaskBase = new MergeTaskBase(null, "foo", segments) + final MergeTaskBase testMergeTaskBase = new MergeTaskBase(null, "foo", segments, null) { @Override protected File merge(Map segments, File outDir) throws Exception diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 3704e2a2795..9aadeecac2f 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -80,7 +80,8 @@ public class TaskSerdeTest new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null)), new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec) ), - jsonMapper + jsonMapper, + null ); for (final Module jacksonModule : new FirehoseModule().getJacksonModules()) { @@ -124,7 +125,8 @@ public class TaskSerdeTest new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null)), new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec) ), - jsonMapper + jsonMapper, + null ); for (final Module jacksonModule : new FirehoseModule().getJacksonModules()) { @@ -164,7 +166,8 @@ public class TaskSerdeTest ImmutableList.of( new CountAggregatorFactory("cnt") ), - indexSpec + indexSpec, + null ); final String json = jsonMapper.writeValueAsString(task); @@ -192,7 +195,8 @@ public class TaskSerdeTest final KillTask task = new KillTask( null, "foo", - new Interval("2010-01-01/P1D") + new Interval("2010-01-01/P1D"), + null ); final String json = jsonMapper.writeValueAsString(task); @@ -216,7 +220,8 @@ public class TaskSerdeTest DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build(), null, false, - true + true, + null ); final String json = jsonMapper.writeValueAsString(task); @@ -242,7 +247,8 @@ public class TaskSerdeTest DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build(), indexSpec, false, - true + true, + null ); final String json = jsonMapper.writeValueAsString(task); @@ -302,7 +308,8 @@ public class TaskSerdeTest null, 0.3F ) - ) + ), + null ); final String json = jsonMapper.writeValueAsString(task); @@ -351,7 +358,8 @@ public class TaskSerdeTest ImmutableList.of( DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build() ), - indexSpec + indexSpec, + null ); final String json = jsonMapper.writeValueAsString(task); @@ -375,7 +383,8 @@ public class TaskSerdeTest final ArchiveTask task = new ArchiveTask( null, "foo", - new Interval("2010-01-01/P1D") + new Interval("2010-01-01/P1D"), + null ); final String json = jsonMapper.writeValueAsString(task); @@ -398,7 +407,8 @@ public class TaskSerdeTest final RestoreTask task = new RestoreTask( null, "foo", - new Interval("2010-01-01/P1D") + new Interval("2010-01-01/P1D"), + null ); final String json = jsonMapper.writeValueAsString(task); @@ -432,7 +442,8 @@ public class TaskSerdeTest ), indexSpec, false, - true + true, + null ); final String json = jsonMapper.writeValueAsString(task); final ConvertSegmentTask taskFromJson = jsonMapper.readValue(json, ConvertSegmentTask.class); @@ -457,7 +468,8 @@ public class TaskSerdeTest segment, new IndexSpec(new RoaringBitmapSerdeFactory(), "lzf", "uncompressed"), false, - true + true, + null ); final String json = jsonMapper.writeValueAsString(convertSegmentTaskOriginal); final Task task = jsonMapper.readValue(json, Task.class); @@ -491,7 +503,8 @@ public class TaskSerdeTest null, "foo", new Interval("2010-01-01/P1D"), - ImmutableMap.of("bucket", "hey", "baseKey", "what") + ImmutableMap.of("bucket", "hey", "baseKey", "what"), + null ); final String json = jsonMapper.writeValueAsString(task); @@ -527,7 +540,8 @@ public class TaskSerdeTest null, null, "blah", - jsonMapper + jsonMapper, + null ); final String json = jsonMapper.writeValueAsString(task); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RealtimeishTask.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RealtimeishTask.java index 3f8d8d87a4f..366cae132e3 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RealtimeishTask.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RealtimeishTask.java @@ -41,12 +41,12 @@ public class RealtimeishTask extends AbstractTask { public RealtimeishTask() { - super("rt1", "rt", new TaskResource("rt1", 1), "foo"); + super("rt1", "rt", new TaskResource("rt1", 1), "foo", null); } public RealtimeishTask(String id, String groupId, TaskResource taskResource, String dataSource) { - super(id, groupId, taskResource, dataSource); + super(id, groupId, taskResource, dataSource, null); } @Override diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 661e60600c0..5d0f13f3bff 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -484,7 +484,8 @@ public class TaskLifecycleTest new IndexTask.IndexIOConfig(new MockFirehoseFactory(false)), new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec) ), - TestUtils.MAPPER + TestUtils.MAPPER, + null ); final Optional preRunTaskStatus = tsqa.getStatus(indexTask.getId()); @@ -540,7 +541,8 @@ public class TaskLifecycleTest new IndexTask.IndexIOConfig(new MockExceptionalFirehoseFactory()), new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec) ), - TestUtils.MAPPER + TestUtils.MAPPER, + null ); final TaskStatus status = runTask(indexTask); @@ -610,7 +612,7 @@ public class TaskLifecycleTest segmentFiles.add(file); } - final Task killTask = new KillTask(null, "test_kill_task", new Interval("2011-04-01/P4D")); + final Task killTask = new KillTask(null, "test_kill_task", new Interval("2011-04-01/P4D"), null); final TaskStatus status = runTask(killTask); Assert.assertEquals("merged statusCode", TaskStatus.Status.SUCCESS, status.getStatusCode()); @@ -675,7 +677,8 @@ public class TaskLifecycleTest "id1", new TaskResource("id1", 1), "ds", - new Interval("2012-01-01/P1D") + new Interval("2012-01-01/P1D"), + null ) { @Override @@ -713,7 +716,7 @@ public class TaskLifecycleTest @Test public void testBadInterval() throws Exception { - final Task task = new AbstractFixedIntervalTask("id1", "id1", "ds", new Interval("2012-01-01/P1D")) + final Task task = new AbstractFixedIntervalTask("id1", "id1", "ds", new Interval("2012-01-01/P1D"), null) { @Override public String getType() @@ -747,7 +750,7 @@ public class TaskLifecycleTest @Test public void testBadVersion() throws Exception { - final Task task = new AbstractFixedIntervalTask("id1", "id1", "ds", new Interval("2012-01-01/P1D")) + final Task task = new AbstractFixedIntervalTask("id1", "id1", "ds", new Interval("2012-01-01/P1D"), null) { @Override public String getType() @@ -823,7 +826,8 @@ public class TaskLifecycleTest RealtimeIndexTask realtimeIndexTask = new RealtimeIndexTask( taskId, new TaskResource(taskId, 1), - fireDepartment + fireDepartment, + null ); tq.add(realtimeIndexTask); //wait for task to process events and publish segment @@ -864,7 +868,8 @@ public class TaskLifecycleTest new IndexTask.IndexIOConfig(new MockFirehoseFactory(false)), new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec) ), - TestUtils.MAPPER + TestUtils.MAPPER, + null ); final long startTime = System.currentTimeMillis(); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java index 5976d2f2967..f455bbe015f 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java @@ -195,7 +195,7 @@ public class OverlordResourceTest Assert.assertEquals(druidNode.getHostAndPort(), response.getEntity()); final String taskId_0 = "0"; - NoopTask task_0 = new NoopTask(taskId_0, 0, 0, null, null); + NoopTask task_0 = new NoopTask(taskId_0, 0, 0, null, null, null); response = overlordResource.taskPost(task_0); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(ImmutableMap.of("task", taskId_0), response.getEntity()); @@ -227,7 +227,7 @@ public class OverlordResourceTest // Manually insert task in taskStorage final String taskId_1 = "1"; - NoopTask task_1 = new NoopTask(taskId_1, 0, 0, null, null); + NoopTask task_1 = new NoopTask(taskId_1, 0, 0, null, null, null); taskStorage.insert(task_1, TaskStatus.running(taskId_1)); // Wait for task runner to run task_1 runTaskCountDownLatches[Integer.parseInt(taskId_1)].await(); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java index 5f1808c15b4..cd9c0c3e9be 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java @@ -49,7 +49,7 @@ public class EqualDistributionWorkerSelectStrategyTest Sets.newHashSet() ) ), - new NoopTask(null, 1, 0, null, null) + new NoopTask(null, 1, 0, null, null, null) { @Override public String getDataSource() diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategyTest.java index 462d876a77b..00289fb1ec6 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategyTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategyTest.java @@ -52,7 +52,7 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest Sets.newHashSet() ) ), - new NoopTask(null, 1, 0, null, null) + new NoopTask(null, 1, 0, null, null, null) { @Override public String getDataSource() @@ -86,7 +86,7 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest Sets.newHashSet() ) ), - new NoopTask(null, 1, 0, null, null) + new NoopTask(null, 1, 0, null, null, null) ); ImmutableZkWorker worker = optional.get(); Assert.assertEquals("lhost", worker.getWorker().getHost()); @@ -108,7 +108,7 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest Sets.newHashSet() ) ), - new NoopTask(null, 1, 0, null, null) + new NoopTask(null, 1, 0, null, null, null) ); Assert.assertFalse(optional.isPresent()); } diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java index 0c5f203b196..5462d04499c 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java @@ -47,7 +47,8 @@ public class TaskAnnouncementTest new TaskResource("rofl", 2), new FireDepartment( new DataSchema("foo", null, new AggregatorFactory[0], null), - new RealtimeIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), new PlumberSchool() + new RealtimeIOConfig( + new LocalFirehoseFactory(new File("lol"), "rofl", null), new PlumberSchool() { @Override public Plumber findPlumber( @@ -56,8 +57,13 @@ public class TaskAnnouncementTest { return null; } - }, null), null - ) + + }, + null + ), + null + ), + null ); final TaskStatus status = TaskStatus.running(task.getId()); final TaskAnnouncement announcement = TaskAnnouncement.create(task, status);