Add Task Context and ability to override task specific properties

override javaOpts

fix compilation

review comments

Add Test for typecast

review comments - remove unused method.
This commit is contained in:
Nishant 2015-08-03 23:29:34 +05:30
parent 4546652b3b
commit 726326abc3
33 changed files with 275 additions and 123 deletions

View File

@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import io.druid.indexing.common.actions.LockTryAcquireAction;
import io.druid.indexing.common.actions.TaskActionClient;
import java.util.Map;
import org.joda.time.Interval;
public abstract class AbstractFixedIntervalTask extends AbstractTask
@ -32,17 +33,19 @@ public abstract class AbstractFixedIntervalTask extends AbstractTask
protected AbstractFixedIntervalTask(
String id,
String dataSource,
Interval interval
Interval interval,
Map<String, Object> context
)
{
this(id, id, new TaskResource(id, 1), dataSource, interval);
this(id, id, new TaskResource(id, 1), dataSource, interval, context);
}
protected AbstractFixedIntervalTask(
String id,
TaskResource taskResource,
String dataSource,
Interval interval
Interval interval,
Map<String, Object> context
)
{
this(
@ -50,7 +53,8 @@ public abstract class AbstractFixedIntervalTask extends AbstractTask
id,
taskResource == null ? new TaskResource(id, 1) : taskResource,
dataSource,
interval
interval,
context
);
}
@ -58,10 +62,11 @@ public abstract class AbstractFixedIntervalTask extends AbstractTask
String id,
String groupId,
String dataSource,
Interval interval
Interval interval,
Map<String, Object> context
)
{
this(id, groupId, new TaskResource(id, 1), dataSource, interval);
this(id, groupId, new TaskResource(id, 1), dataSource, interval, context);
}
protected AbstractFixedIntervalTask(
@ -69,10 +74,11 @@ public abstract class AbstractFixedIntervalTask extends AbstractTask
String groupId,
TaskResource taskResource,
String dataSource,
Interval interval
Interval interval,
Map<String, Object> context
)
{
super(id, groupId, taskResource, dataSource);
super(id, groupId, taskResource, dataSource, context);
this.interval = Preconditions.checkNotNull(interval, "interval");
Preconditions.checkArgument(interval.toDurationMillis() > 0, "interval empty");
}

View File

@ -28,6 +28,7 @@ import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockListAction;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import java.util.Map;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -49,22 +50,25 @@ public abstract class AbstractTask implements Task
@JsonIgnore
private final String dataSource;
protected AbstractTask(String id, String dataSource)
private final Map<String, Object> context;
protected AbstractTask(String id, String dataSource, Map<String, Object> context)
{
this(id, id, new TaskResource(id, 1), dataSource);
this(id, id, new TaskResource(id, 1), dataSource, context);
}
protected AbstractTask(String id, String groupId, String dataSource)
protected AbstractTask(String id, String groupId, String dataSource, Map<String, Object> context)
{
this(id, groupId, new TaskResource(id, 1), dataSource);
this(id, groupId, new TaskResource(id, 1), dataSource, context);
}
protected AbstractTask(String id, String groupId, TaskResource taskResource, String dataSource)
protected AbstractTask(String id, String groupId, TaskResource taskResource, String dataSource, Map<String, Object> context)
{
this.id = Preconditions.checkNotNull(id, "id");
this.groupId = Preconditions.checkNotNull(groupId, "groupId");
this.taskResource = Preconditions.checkNotNull(taskResource, "resource");
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.context = context;
}
public static String makeId(String id, final String typeName, String dataSource, Interval interval)
@ -179,4 +183,18 @@ public abstract class AbstractTask implements Task
{
return toolbox.getTaskActionClient().submit(new LockListAction());
}
@Override
@JsonProperty
public Map<String, Object> getContext()
{
return context;
}
@Override
public Object getContextValue(String key)
{
return context == null ? null : context.get(key);
}
}

View File

@ -55,10 +55,11 @@ public class AppendTask extends MergeTaskBase
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("indexSpec") IndexSpec indexSpec
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("context") Map<String, Object> context
)
{
super(id, dataSource, segments);
super(id, dataSource, segments, context);
this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec;
}

View File

@ -28,6 +28,7 @@ import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentListUnusedAction;
import io.druid.indexing.common.actions.SegmentMetadataUpdateAction;
import io.druid.timeline.DataSegment;
import java.util.Map;
import org.joda.time.Interval;
import java.util.List;
@ -39,13 +40,15 @@ public class ArchiveTask extends AbstractFixedIntervalTask
public ArchiveTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval
@JsonProperty("interval") Interval interval,
@JsonProperty("context") Map<String, Object> context
)
{
super(
makeId(id, "archive", dataSource, interval),
dataSource,
interval
interval,
context
);
}

View File

@ -27,7 +27,8 @@ public class ConvertSegmentBackwardsCompatibleTask extends ConvertSegmentTask
segment,
indexSpec,
force == null ? false : force,
validate ==null ? false : validate
validate ==null ? false : validate,
null
);
}
@ -43,7 +44,7 @@ public class ConvertSegmentBackwardsCompatibleTask extends ConvertSegmentTask
@JsonProperty("validate") Boolean validate
)
{
super(groupId, segment, indexSpec, force, validate);
super(groupId, segment, indexSpec, force, validate, null);
}
}
}

View File

@ -81,11 +81,12 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
Interval interval,
IndexSpec indexSpec,
boolean force,
boolean validate
boolean validate,
Map<String, Object> context
)
{
final String id = makeId(dataSource, interval);
return new ConvertSegmentTask(id, dataSource, interval, null, indexSpec, force, validate);
return new ConvertSegmentTask(id, dataSource, interval, null, indexSpec, force, validate, context);
}
/**
@ -98,12 +99,13 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
*
* @return A SegmentConverterTask for the segment with the indexSpec specified.
*/
public static ConvertSegmentTask create(DataSegment segment, IndexSpec indexSpec, boolean force, boolean validate)
public static ConvertSegmentTask create(DataSegment segment, IndexSpec indexSpec, boolean force, boolean validate, Map<String, Object> context
)
{
final Interval interval = segment.getInterval();
final String dataSource = segment.getDataSource();
final String id = makeId(dataSource, interval);
return new ConvertSegmentTask(id, dataSource, interval, segment, indexSpec, force, validate);
return new ConvertSegmentTask(id, dataSource, interval, segment, indexSpec, force, validate, context);
}
protected static String makeId(String dataSource, Interval interval)
@ -121,19 +123,20 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
@JsonProperty("segment") DataSegment segment,
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("force") Boolean force,
@JsonProperty("validate") Boolean validate
@JsonProperty("validate") Boolean validate,
@JsonProperty("context") Map<String, Object> context
)
{
final boolean isForce = force == null ? false : force;
final boolean isValidate = validate == null ? true : validate;
if (id == null) {
if (segment == null) {
return create(dataSource, interval, indexSpec, isForce, isValidate);
return create(dataSource, interval, indexSpec, isForce, isValidate, context);
} else {
return create(segment, indexSpec, isForce, isValidate);
return create(segment, indexSpec, isForce, isValidate, context);
}
}
return new ConvertSegmentTask(id, dataSource, interval, segment, indexSpec, isForce, isValidate);
return new ConvertSegmentTask(id, dataSource, interval, segment, indexSpec, isForce, isValidate, context);
}
protected ConvertSegmentTask(
@ -143,10 +146,11 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
DataSegment segment,
IndexSpec indexSpec,
boolean force,
boolean validate
boolean validate,
Map<String, Object> context
)
{
super(id, dataSource, interval);
super(id, dataSource, interval, context);
this.segment = segment;
this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec;
this.force = force;
@ -224,7 +228,7 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
segmentsToUpdate = Collections.singleton(segment);
}
// Vestigial from a past time when this task spawned subtasks.
for (final Task subTask : generateSubTasks(getGroupId(), segmentsToUpdate, indexSpec, force, validate)) {
for (final Task subTask : generateSubTasks(getGroupId(), segmentsToUpdate, indexSpec, force, validate, getContext())) {
final TaskStatus status = subTask.run(toolbox);
if (!status.isSuccess()) {
return TaskStatus.fromCode(getId(), status.getStatusCode());
@ -238,7 +242,8 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
final Iterable<DataSegment> segments,
final IndexSpec indexSpec,
final boolean force,
final boolean validate
final boolean validate,
final Map<String, Object> context
)
{
return Iterables.transform(
@ -248,7 +253,7 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
@Override
public Task apply(DataSegment input)
{
return new SubTask(groupId, input, indexSpec, force, validate);
return new SubTask(groupId, input, indexSpec, force, validate, context);
}
}
);
@ -287,7 +292,8 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
@JsonProperty("segment") DataSegment segment,
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("force") Boolean force,
@JsonProperty("validate") Boolean validate
@JsonProperty("validate") Boolean validate,
@JsonProperty("context") Map<String, Object> context
)
{
super(
@ -300,7 +306,8 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
),
groupId,
segment.getDataSource(),
segment.getInterval()
segment.getInterval(),
context
);
this.segment = segment;
this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec;

View File

@ -63,7 +63,8 @@ public class HadoopConverterTask extends ConvertSegmentTask
@JsonProperty("distributedSuccessCache") URI distributedSuccessCache,
@JsonProperty("jobPriority") String jobPriority,
@JsonProperty("segmentOutputPath") String segmentOutputPath,
@JsonProperty("classpathPrefix") String classpathPrefix
@JsonProperty("classpathPrefix") String classpathPrefix,
@JsonProperty("context") Map<String, Object> context
)
{
super(
@ -78,7 +79,8 @@ public class HadoopConverterTask extends ConvertSegmentTask
null, // Always call subtask codepath
indexSpec,
force,
validate == null ? true : validate
validate == null ? true : validate,
context
);
this.hadoopDependencyCoordinates = hadoopDependencyCoordinates;
this.distributedSuccessCache = Preconditions.checkNotNull(distributedSuccessCache, "distributedSuccessCache");
@ -130,13 +132,15 @@ public class HadoopConverterTask extends ConvertSegmentTask
final Iterable<DataSegment> segments,
final IndexSpec indexSpec,
final boolean force,
final boolean validate
final boolean validate,
Map<String, Object> context
)
{
return Collections.<Task>singleton(
new ConverterSubTask(
ImmutableList.copyOf(segments),
this
this,
context
)
);
}
@ -164,7 +168,8 @@ public class HadoopConverterTask extends ConvertSegmentTask
@JsonCreator
public ConverterSubTask(
@JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("parent") HadoopConverterTask parent
@JsonProperty("parent") HadoopConverterTask parent,
@JsonProperty("context") Map<String, Object> context
)
{
super(
@ -175,7 +180,8 @@ public class HadoopConverterTask extends ConvertSegmentTask
parent.getInterval().getEnd()
),
parent.getDataSource(),
parent.getHadoopDependencyCoordinates()
parent.getHadoopDependencyCoordinates(),
context
);
this.segments = segments;
this.parent = parent;

View File

@ -45,6 +45,7 @@ import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.hadoop.OverlordActionBasedUsedSegmentLister;
import io.druid.timeline.DataSegment;
import java.util.Map;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -88,7 +89,8 @@ public class HadoopIndexTask extends HadoopTask
@JsonProperty("hadoopCoordinates") String hadoopCoordinates,
@JsonProperty("hadoopDependencyCoordinates") List<String> hadoopDependencyCoordinates,
@JsonProperty("classpathPrefix") String classpathPrefix,
@JacksonInject ObjectMapper jsonMapper
@JacksonInject ObjectMapper jsonMapper,
@JsonProperty("context") Map<String, Object> context
)
{
super(
@ -96,7 +98,8 @@ public class HadoopIndexTask extends HadoopTask
getTheDataSource(spec),
hadoopDependencyCoordinates == null
? (hadoopCoordinates == null ? null : ImmutableList.of(hadoopCoordinates))
: hadoopDependencyCoordinates
: hadoopDependencyCoordinates,
context
);

View File

@ -38,6 +38,7 @@ import java.net.URL;
import java.net.URLClassLoader;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public abstract class HadoopTask extends AbstractTask
@ -53,9 +54,14 @@ public abstract class HadoopTask extends AbstractTask
private final List<String> hadoopDependencyCoordinates;
protected HadoopTask(String id, String dataSource, List<String> hadoopDependencyCoordinates)
protected HadoopTask(
String id,
String dataSource,
List<String> hadoopDependencyCoordinates,
Map<String, Object> context
)
{
super(id, dataSource);
super(id, dataSource, context);
this.hadoopDependencyCoordinates = hadoopDependencyCoordinates;
}

View File

@ -58,6 +58,7 @@ import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.ShardSpec;
import java.util.Map;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -146,7 +147,8 @@ public class IndexTask extends AbstractFixedIntervalTask
@JsonProperty("id") String id,
@JsonProperty("resource") TaskResource taskResource,
@JsonProperty("spec") IndexIngestionSpec ingestionSchema,
@JacksonInject ObjectMapper jsonMapper
@JacksonInject ObjectMapper jsonMapper,
@JsonProperty("context") Map<String, Object> context
)
{
super(
@ -154,7 +156,8 @@ public class IndexTask extends AbstractFixedIntervalTask
makeId(id, ingestionSchema),
taskResource,
makeDataSource(ingestionSchema),
makeInterval(ingestionSchema)
makeInterval(ingestionSchema),
context
);

View File

@ -29,6 +29,7 @@ import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentListUnusedAction;
import io.druid.indexing.common.actions.SegmentNukeAction;
import io.druid.timeline.DataSegment;
import java.util.Map;
import org.joda.time.Interval;
import java.util.List;
@ -43,13 +44,15 @@ public class KillTask extends AbstractFixedIntervalTask
public KillTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval
@JsonProperty("interval") Interval interval,
@JsonProperty("context") Map<String, Object> context
)
{
super(
makeId(id, "kill", dataSource, interval),
dataSource,
interval
interval,
context
);
}

View File

@ -50,10 +50,12 @@ public class MergeTask extends MergeTaskBase
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("aggregations") List<AggregatorFactory> aggregators,
@JsonProperty("indexSpec") IndexSpec indexSpec
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("context") Map<String, Object> context
)
{
super(id, dataSource, segments);
super(id, dataSource, segments, context);
this.aggregators = aggregators;
this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec;
}

View File

@ -62,7 +62,12 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask
private static final EmittingLogger log = new EmittingLogger(MergeTaskBase.class);
protected MergeTaskBase(final String id, final String dataSource, final List<DataSegment> segments)
protected MergeTaskBase(
final String id,
final String dataSource,
final List<DataSegment> segments,
Map<String, Object> context
)
{
super(
// _not_ the version, just something uniqueish
@ -70,7 +75,8 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask
"merge_%s_%s", computeProcessingID(dataSource, segments), new DateTime().toString()
),
dataSource,
computeMergedInterval(segments)
computeMergedInterval(segments),
context
);
// Verify segment list is nonempty
@ -249,19 +255,19 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask
final String segmentIDs = Joiner.on("_").join(
Iterables.transform(
Ordering.natural().sortedCopy(segments), new Function<DataSegment, String>()
{
@Override
public String apply(DataSegment x)
{
return String.format(
"%s_%s_%s_%s",
x.getInterval().getStart(),
x.getInterval().getEnd(),
x.getVersion(),
x.getShardSpec().getPartitionNum()
);
}
}
{
@Override
public String apply(DataSegment x)
{
return String.format(
"%s_%s_%s_%s",
x.getInterval().getStart(),
x.getInterval().getEnd(),
x.getVersion(),
x.getShardSpec().getPartitionNum()
);
}
}
)
);

View File

@ -45,13 +45,15 @@ public class MoveTask extends AbstractFixedIntervalTask
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("target") Map<String, Object> targetLoadSpec
@JsonProperty("target") Map<String, Object> targetLoadSpec,
@JsonProperty("context") Map<String, Object> context
)
{
super(
makeId(id, "move", dataSource, interval),
dataSource,
interval
interval,
context
);
this.targetLoadSpec = targetLoadSpec;
}

View File

@ -26,6 +26,7 @@ import io.druid.data.input.FirehoseFactory;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.TaskActionClient;
import java.util.Map;
import org.joda.time.DateTime;
import java.util.UUID;
@ -64,12 +65,14 @@ public class NoopTask extends AbstractTask
@JsonProperty("runTime") long runTime,
@JsonProperty("isReadyTime") long isReadyTime,
@JsonProperty("isReadyResult") String isReadyResult,
@JsonProperty("firehose") FirehoseFactory firehoseFactory
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("context") Map<String, Object> context
)
{
super(
id == null ? String.format("noop_%s_%s", new DateTime(), UUID.randomUUID().toString()) : id,
"none"
"none",
context
);
this.runTime = (runTime == 0) ? defaultRunTime : runTime;
@ -142,6 +145,6 @@ public class NoopTask extends AbstractTask
public static NoopTask create()
{
return new NoopTask(null, 0, 0, null, null);
return new NoopTask(null, 0, 0, null, null, null);
}
}

View File

@ -52,6 +52,7 @@ import io.druid.segment.realtime.plumber.Sink;
import io.druid.segment.realtime.plumber.VersioningPolicy;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import java.util.Map;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
@ -108,14 +109,16 @@ public class RealtimeIndexTask extends AbstractTask
public RealtimeIndexTask(
@JsonProperty("id") String id,
@JsonProperty("resource") TaskResource taskResource,
@JsonProperty("spec") FireDepartment fireDepartment
@JsonProperty("spec") FireDepartment fireDepartment,
@JsonProperty("context") Map<String, Object> context
)
{
super(
id == null ? makeTaskId(fireDepartment) : id,
String.format("index_realtime_%s", makeDatasource(fireDepartment)),
taskResource == null ? new TaskResource(makeTaskId(fireDepartment), 1) : taskResource,
makeDatasource(fireDepartment)
makeDatasource(fireDepartment),
context
);
this.spec = fireDepartment;
}

View File

@ -29,6 +29,7 @@ import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentListUnusedAction;
import io.druid.indexing.common.actions.SegmentMetadataUpdateAction;
import io.druid.timeline.DataSegment;
import java.util.Map;
import org.joda.time.Interval;
import java.util.List;
@ -40,13 +41,15 @@ public class RestoreTask extends AbstractFixedIntervalTask
public RestoreTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval
@JsonProperty("interval") Interval interval,
@JsonProperty("context") Map<String, Object> context
)
{
super(
makeId(id, "restore", dataSource, interval),
dataSource,
interval
interval,
context
);
}

View File

@ -24,6 +24,7 @@ import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import java.util.Map;
/**
* Represents a task that can run on a worker. The general contracts surrounding Tasks are:
@ -138,4 +139,9 @@ public interface Task
* @throws Exception if this task failed
*/
public TaskStatus run(TaskToolbox toolbox) throws Exception;
public Map<String, Object> getContext();
public Object getContextValue(String key);
}

View File

@ -123,7 +123,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
// better way to achieve this is to pass toolbox to Firehose, The instance is initialized Lazily on connect method.
// Noop Task is just used to create the toolbox and list segments.
final TaskToolbox toolbox = injector.getInstance(TaskToolboxFactory.class).build(
new NoopTask("reingest", 0, 0, null, null)
new NoopTask("reingest", 0, 0, null, null, null)
);
try {

View File

@ -174,6 +174,19 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
Iterables.addAll(command, whiteSpaceSplitter.split(config.getJavaOpts()));
// Override task specific javaOpts
Object taskJavaOpts = task.getContextValue(
"druid.indexer.runner.javaOpts"
);
if(taskJavaOpts != null) {
Iterables.addAll(
command,
whiteSpaceSplitter.split(
(String) taskJavaOpts
)
);
}
for (String propName : props.stringPropertyNames()) {
for (String allowedPrefix : config.getAllowedPrefixes()) {
if (propName.startsWith(allowedPrefix)) {
@ -201,6 +214,19 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
}
}
// Override task specific properties
for (String propName : task.getContext().keySet()) {
if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
command.add(
String.format(
"-D%s=%s",
propName.substring(CHILD_PROPERTY_PREFIX.length()),
task.getContextValue(propName)
)
);
}
}
command.add(String.format("-Ddruid.host=%s", childHost));
command.add(String.format("-Ddruid.port=%d", childPort));

View File

@ -69,7 +69,7 @@ public class TestMergeTask extends MergeTask
@JsonProperty("indexSpec") IndexSpec indexSpec
)
{
super(id, dataSource, segments, aggregators, indexSpec);
super(id, dataSource, segments, aggregators, indexSpec, null);
this.id = id;
}

View File

@ -65,7 +65,8 @@ public class TestRealtimeTask extends RealtimeIndexTask
}
}, null
), null
)
),
null
);
this.status = status;
}

View File

@ -42,7 +42,8 @@ public class ConvertSegmentTaskTest
final Interval interval = new Interval(new DateTime().minus(1000), new DateTime());
ConvertSegmentTask task = ConvertSegmentTask.create(dataSource, interval, null, false, true);
ConvertSegmentTask task = ConvertSegmentTask.create(dataSource, interval, null, false, true, null);
Task task2 = jsonMapper.readValue(jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(task), Task.class);
Assert.assertEquals(task, task2);
@ -58,7 +59,7 @@ public class ConvertSegmentTaskTest
102937
);
task = ConvertSegmentTask.create(segment, null, false, true);
task = ConvertSegmentTask.create(segment, null, false, true, null);
task2 = jsonMapper.readValue(jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(task), Task.class);
Assert.assertEquals(task, task2);

View File

@ -82,7 +82,8 @@ public class HadoopConverterTaskSerDeTest
DISTRIBUTED_CACHE,
PRIORITY,
OUTPUT_PATH,
CLASSPATH_PREFIX
CLASSPATH_PREFIX,
null
);
final String strOrig = objectMapper.writeValueAsString(orig);
HadoopConverterTask other = objectMapper.readValue(strOrig, HadoopConverterTask.class);
@ -106,12 +107,15 @@ public class HadoopConverterTaskSerDeTest
DISTRIBUTED_CACHE,
PRIORITY,
OUTPUT_PATH,
CLASSPATH_PREFIX
CLASSPATH_PREFIX,
null
);
HadoopConverterTask.ConverterSubTask subTask = new HadoopConverterTask.ConverterSubTask(
ImmutableList.of(
DATA_SEGMENT
), parent
),
parent,
null
);
final String origString = objectMapper.writeValueAsString(subTask);
final HadoopConverterTask.ConverterSubTask otherSub = objectMapper.readValue(
@ -156,12 +160,15 @@ public class HadoopConverterTaskSerDeTest
DISTRIBUTED_CACHE,
PRIORITY,
OUTPUT_PATH,
CLASSPATH_PREFIX
CLASSPATH_PREFIX,
null
);
HadoopConverterTask.ConverterSubTask subTask = new HadoopConverterTask.ConverterSubTask(
ImmutableList.of(
DATA_SEGMENT
), parent
),
parent,
null
);
Assert.assertEquals(parent.getType(), "hadoop_convert_segment");
Assert.assertEquals(parent.getType() + "_sub", subTask.getType());
@ -181,7 +188,8 @@ public class HadoopConverterTaskSerDeTest
DISTRIBUTED_CACHE,
PRIORITY,
OUTPUT_PATH,
CLASSPATH_PREFIX
CLASSPATH_PREFIX,
null
);
Assert.assertTrue(orig.isValidate());
}
@ -200,6 +208,7 @@ public class HadoopConverterTaskSerDeTest
DISTRIBUTED_CACHE,
null,
OUTPUT_PATH,
null,
null
);
Assert.assertEquals(DATA_SOURCE, parent.getDataSource());
@ -224,7 +233,8 @@ public class HadoopConverterTaskSerDeTest
DISTRIBUTED_CACHE,
PRIORITY,
OUTPUT_PATH,
CLASSPATH_PREFIX
CLASSPATH_PREFIX,
null
);
orig.getSegment();
}
@ -243,6 +253,7 @@ public class HadoopConverterTaskSerDeTest
DISTRIBUTED_CACHE,
null,
OUTPUT_PATH,
null,
null
);
}
@ -261,6 +272,7 @@ public class HadoopConverterTaskSerDeTest
DISTRIBUTED_CACHE,
null,
OUTPUT_PATH,
null,
null
);
}
@ -279,6 +291,7 @@ public class HadoopConverterTaskSerDeTest
null,
null,
OUTPUT_PATH,
null,
null
);
}
@ -297,6 +310,7 @@ public class HadoopConverterTaskSerDeTest
DISTRIBUTED_CACHE,
null,
null,
null,
null
);
}

View File

@ -122,7 +122,8 @@ public class IndexTaskTest
indexSpec
)
),
new DefaultObjectMapper()
new DefaultObjectMapper(),
null
);
final List<DataSegment> segments = runTask(indexTask);
@ -182,7 +183,8 @@ public class IndexTaskTest
),
null
),
new DefaultObjectMapper()
new DefaultObjectMapper(),
null
);
List<DataSegment> segments = runTask(indexTask);
@ -290,7 +292,8 @@ public class IndexTaskTest
),
null
),
new DefaultObjectMapper()
new DefaultObjectMapper(),
null
);
final List<DataSegment> segments = runTask(indexTask);

View File

@ -41,7 +41,7 @@ public class MergeTaskBaseTest
.add(segmentBuilder.interval(new Interval("2012-01-03/2012-01-05")).build())
.build();
final MergeTaskBase testMergeTaskBase = new MergeTaskBase(null, "foo", segments)
final MergeTaskBase testMergeTaskBase = new MergeTaskBase(null, "foo", segments, null)
{
@Override
protected File merge(Map<DataSegment, File> segments, File outDir) throws Exception

View File

@ -80,7 +80,8 @@ public class TaskSerdeTest
new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null)),
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec)
),
jsonMapper
jsonMapper,
null
);
for (final Module jacksonModule : new FirehoseModule().getJacksonModules()) {
@ -124,7 +125,8 @@ public class TaskSerdeTest
new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null)),
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec)
),
jsonMapper
jsonMapper,
null
);
for (final Module jacksonModule : new FirehoseModule().getJacksonModules()) {
@ -164,7 +166,8 @@ public class TaskSerdeTest
ImmutableList.<AggregatorFactory>of(
new CountAggregatorFactory("cnt")
),
indexSpec
indexSpec,
null
);
final String json = jsonMapper.writeValueAsString(task);
@ -192,7 +195,8 @@ public class TaskSerdeTest
final KillTask task = new KillTask(
null,
"foo",
new Interval("2010-01-01/P1D")
new Interval("2010-01-01/P1D"),
null
);
final String json = jsonMapper.writeValueAsString(task);
@ -216,7 +220,8 @@ public class TaskSerdeTest
DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build(),
null,
false,
true
true,
null
);
final String json = jsonMapper.writeValueAsString(task);
@ -242,7 +247,8 @@ public class TaskSerdeTest
DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build(),
indexSpec,
false,
true
true,
null
);
final String json = jsonMapper.writeValueAsString(task);
@ -302,7 +308,8 @@ public class TaskSerdeTest
null,
0.3F
)
)
),
null
);
final String json = jsonMapper.writeValueAsString(task);
@ -351,7 +358,8 @@ public class TaskSerdeTest
ImmutableList.of(
DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build()
),
indexSpec
indexSpec,
null
);
final String json = jsonMapper.writeValueAsString(task);
@ -375,7 +383,8 @@ public class TaskSerdeTest
final ArchiveTask task = new ArchiveTask(
null,
"foo",
new Interval("2010-01-01/P1D")
new Interval("2010-01-01/P1D"),
null
);
final String json = jsonMapper.writeValueAsString(task);
@ -398,7 +407,8 @@ public class TaskSerdeTest
final RestoreTask task = new RestoreTask(
null,
"foo",
new Interval("2010-01-01/P1D")
new Interval("2010-01-01/P1D"),
null
);
final String json = jsonMapper.writeValueAsString(task);
@ -432,7 +442,8 @@ public class TaskSerdeTest
),
indexSpec,
false,
true
true,
null
);
final String json = jsonMapper.writeValueAsString(task);
final ConvertSegmentTask taskFromJson = jsonMapper.readValue(json, ConvertSegmentTask.class);
@ -457,7 +468,8 @@ public class TaskSerdeTest
segment,
new IndexSpec(new RoaringBitmapSerdeFactory(), "lzf", "uncompressed"),
false,
true
true,
null
);
final String json = jsonMapper.writeValueAsString(convertSegmentTaskOriginal);
final Task task = jsonMapper.readValue(json, Task.class);
@ -491,7 +503,8 @@ public class TaskSerdeTest
null,
"foo",
new Interval("2010-01-01/P1D"),
ImmutableMap.<String, Object>of("bucket", "hey", "baseKey", "what")
ImmutableMap.<String, Object>of("bucket", "hey", "baseKey", "what"),
null
);
final String json = jsonMapper.writeValueAsString(task);
@ -527,7 +540,8 @@ public class TaskSerdeTest
null,
null,
"blah",
jsonMapper
jsonMapper,
null
);
final String json = jsonMapper.writeValueAsString(task);

View File

@ -41,12 +41,12 @@ public class RealtimeishTask extends AbstractTask
{
public RealtimeishTask()
{
super("rt1", "rt", new TaskResource("rt1", 1), "foo");
super("rt1", "rt", new TaskResource("rt1", 1), "foo", null);
}
public RealtimeishTask(String id, String groupId, TaskResource taskResource, String dataSource)
{
super(id, groupId, taskResource, dataSource);
super(id, groupId, taskResource, dataSource, null);
}
@Override

View File

@ -484,7 +484,8 @@ public class TaskLifecycleTest
new IndexTask.IndexIOConfig(new MockFirehoseFactory(false)),
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec)
),
TestUtils.MAPPER
TestUtils.MAPPER,
null
);
final Optional<TaskStatus> preRunTaskStatus = tsqa.getStatus(indexTask.getId());
@ -540,7 +541,8 @@ public class TaskLifecycleTest
new IndexTask.IndexIOConfig(new MockExceptionalFirehoseFactory()),
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec)
),
TestUtils.MAPPER
TestUtils.MAPPER,
null
);
final TaskStatus status = runTask(indexTask);
@ -610,7 +612,7 @@ public class TaskLifecycleTest
segmentFiles.add(file);
}
final Task killTask = new KillTask(null, "test_kill_task", new Interval("2011-04-01/P4D"));
final Task killTask = new KillTask(null, "test_kill_task", new Interval("2011-04-01/P4D"), null);
final TaskStatus status = runTask(killTask);
Assert.assertEquals("merged statusCode", TaskStatus.Status.SUCCESS, status.getStatusCode());
@ -675,7 +677,8 @@ public class TaskLifecycleTest
"id1",
new TaskResource("id1", 1),
"ds",
new Interval("2012-01-01/P1D")
new Interval("2012-01-01/P1D"),
null
)
{
@Override
@ -713,7 +716,7 @@ public class TaskLifecycleTest
@Test
public void testBadInterval() throws Exception
{
final Task task = new AbstractFixedIntervalTask("id1", "id1", "ds", new Interval("2012-01-01/P1D"))
final Task task = new AbstractFixedIntervalTask("id1", "id1", "ds", new Interval("2012-01-01/P1D"), null)
{
@Override
public String getType()
@ -747,7 +750,7 @@ public class TaskLifecycleTest
@Test
public void testBadVersion() throws Exception
{
final Task task = new AbstractFixedIntervalTask("id1", "id1", "ds", new Interval("2012-01-01/P1D"))
final Task task = new AbstractFixedIntervalTask("id1", "id1", "ds", new Interval("2012-01-01/P1D"), null)
{
@Override
public String getType()
@ -823,7 +826,8 @@ public class TaskLifecycleTest
RealtimeIndexTask realtimeIndexTask = new RealtimeIndexTask(
taskId,
new TaskResource(taskId, 1),
fireDepartment
fireDepartment,
null
);
tq.add(realtimeIndexTask);
//wait for task to process events and publish segment
@ -864,7 +868,8 @@ public class TaskLifecycleTest
new IndexTask.IndexIOConfig(new MockFirehoseFactory(false)),
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec)
),
TestUtils.MAPPER
TestUtils.MAPPER,
null
);
final long startTime = System.currentTimeMillis();

View File

@ -195,7 +195,7 @@ public class OverlordResourceTest
Assert.assertEquals(druidNode.getHostAndPort(), response.getEntity());
final String taskId_0 = "0";
NoopTask task_0 = new NoopTask(taskId_0, 0, 0, null, null);
NoopTask task_0 = new NoopTask(taskId_0, 0, 0, null, null, null);
response = overlordResource.taskPost(task_0);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("task", taskId_0), response.getEntity());
@ -227,7 +227,7 @@ public class OverlordResourceTest
// Manually insert task in taskStorage
final String taskId_1 = "1";
NoopTask task_1 = new NoopTask(taskId_1, 0, 0, null, null);
NoopTask task_1 = new NoopTask(taskId_1, 0, 0, null, null, null);
taskStorage.insert(task_1, TaskStatus.running(taskId_1));
// Wait for task runner to run task_1
runTaskCountDownLatches[Integer.parseInt(taskId_1)].await();

View File

@ -49,7 +49,7 @@ public class EqualDistributionWorkerSelectStrategyTest
Sets.<String>newHashSet()
)
),
new NoopTask(null, 1, 0, null, null)
new NoopTask(null, 1, 0, null, null, null)
{
@Override
public String getDataSource()

View File

@ -52,7 +52,7 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
Sets.<String>newHashSet()
)
),
new NoopTask(null, 1, 0, null, null)
new NoopTask(null, 1, 0, null, null, null)
{
@Override
public String getDataSource()
@ -86,7 +86,7 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
Sets.<String>newHashSet()
)
),
new NoopTask(null, 1, 0, null, null)
new NoopTask(null, 1, 0, null, null, null)
);
ImmutableZkWorker worker = optional.get();
Assert.assertEquals("lhost", worker.getWorker().getHost());
@ -108,7 +108,7 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
Sets.<String>newHashSet()
)
),
new NoopTask(null, 1, 0, null, null)
new NoopTask(null, 1, 0, null, null, null)
);
Assert.assertFalse(optional.isPresent());
}

View File

@ -47,7 +47,8 @@ public class TaskAnnouncementTest
new TaskResource("rofl", 2),
new FireDepartment(
new DataSchema("foo", null, new AggregatorFactory[0], null),
new RealtimeIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), new PlumberSchool()
new RealtimeIOConfig(
new LocalFirehoseFactory(new File("lol"), "rofl", null), new PlumberSchool()
{
@Override
public Plumber findPlumber(
@ -56,8 +57,13 @@ public class TaskAnnouncementTest
{
return null;
}
}, null), null
)
},
null
),
null
),
null
);
final TaskStatus status = TaskStatus.running(task.getId());
final TaskAnnouncement announcement = TaskAnnouncement.create(task, status);