Merge pull request #1654 from gianm/index-add-taskResource

Add ability to provide taskResource for IndexTask.
This commit is contained in:
Xavier Léauté 2015-08-25 10:25:35 -07:00
commit de8f806cc1
5 changed files with 73 additions and 0 deletions

View File

@ -38,6 +38,22 @@ public abstract class AbstractFixedIntervalTask extends AbstractTask
this(id, id, new TaskResource(id, 1), dataSource, interval);
}
protected AbstractFixedIntervalTask(
String id,
TaskResource taskResource,
String dataSource,
Interval interval
)
{
this(
id,
id,
taskResource == null ? new TaskResource(id, 1) : taskResource,
dataSource,
interval
);
}
protected AbstractFixedIntervalTask(
String id,
String groupId,

View File

@ -144,6 +144,7 @@ public class IndexTask extends AbstractFixedIntervalTask
@JsonCreator
public IndexTask(
@JsonProperty("id") String id,
@JsonProperty("resource") TaskResource taskResource,
@JsonProperty("spec") IndexIngestionSpec ingestionSchema,
@JacksonInject ObjectMapper jsonMapper
)
@ -151,6 +152,7 @@ public class IndexTask extends AbstractFixedIntervalTask
super(
// _not_ the version, just something uniqueish
makeId(id, ingestionSchema),
taskResource,
makeDataSource(ingestionSchema),
makeInterval(ingestionSchema)
);

View File

@ -78,6 +78,7 @@ public class IndexTaskTest
writer.close();
IndexTask indexTask = new IndexTask(
null,
null,
new IndexTask.IndexIngestionSpec(
new DataSchema(
@ -143,6 +144,7 @@ public class IndexTaskTest
writer.close();
IndexTask indexTask = new IndexTask(
null,
null,
new IndexTask.IndexIngestionSpec(
new DataSchema(
@ -249,6 +251,7 @@ public class IndexTaskTest
writer.close();
IndexTask indexTask = new IndexTask(
null,
null,
new IndexTask.IndexIngestionSpec(
new DataSchema(

View File

@ -62,6 +62,7 @@ public class TaskSerdeTest
public void testIndexTaskSerde() throws Exception
{
final IndexTask task = new IndexTask(
null,
null,
new IndexTask.IndexIngestionSpec(
new DataSchema(
@ -101,6 +102,54 @@ public class TaskSerdeTest
Assert.assertTrue(task2.getIngestionSchema().getIOConfig().getFirehoseFactory() instanceof LocalFirehoseFactory);
}
@Test
public void testIndexTaskwithResourceSerde() throws Exception
{
final IndexTask task = new IndexTask(
null,
new TaskResource("rofl", 2),
new IndexTask.IndexIngestionSpec(
new DataSchema(
"foo",
null,
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
new UniformGranularitySpec(
Granularity.DAY,
null,
ImmutableList.of(new Interval("2010-01-01/P2D"))
)
),
new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null)),
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec)
),
jsonMapper
);
for (final Module jacksonModule : new FirehoseModule().getJacksonModules()) {
jsonMapper.registerModule(jacksonModule);
}
InjectableValues inject = new InjectableValues.Std()
.addValue(ObjectMapper.class, jsonMapper);
final String json = jsonMapper.writeValueAsString(task);
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
final IndexTask task2 = jsonMapper.reader(Task.class).with(inject).readValue(json);
Assert.assertEquals("foo", task.getDataSource());
Assert.assertEquals(new Interval("2010-01-01/P2D"), task.getInterval());
Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(2, task.getTaskResource().getRequiredCapacity());
Assert.assertEquals("rofl", task.getTaskResource().getAvailabilityGroup());
Assert.assertEquals(task.getTaskResource().getRequiredCapacity(), task2.getTaskResource().getRequiredCapacity());
Assert.assertEquals(task.getTaskResource().getAvailabilityGroup(), task2.getTaskResource().getAvailabilityGroup());
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getInterval(), task2.getInterval());
Assert.assertTrue(task.getIngestionSchema().getIOConfig().getFirehoseFactory() instanceof LocalFirehoseFactory);
Assert.assertTrue(task2.getIngestionSchema().getIOConfig().getFirehoseFactory() instanceof LocalFirehoseFactory);
}
@Test
public void testMergeTaskSerde() throws Exception
{

View File

@ -479,6 +479,7 @@ public class TaskLifecycleTest
public void testIndexTask() throws Exception
{
final Task indexTask = new IndexTask(
null,
null,
new IndexTask.IndexIngestionSpec(
new DataSchema(
@ -534,6 +535,7 @@ public class TaskLifecycleTest
public void testIndexTaskFailure() throws Exception
{
final Task indexTask = new IndexTask(
null,
null,
new IndexTask.IndexIngestionSpec(
new DataSchema(
@ -857,6 +859,7 @@ public class TaskLifecycleTest
public void testResumeTasks() throws Exception
{
final Task indexTask = new IndexTask(
null,
null,
new IndexTask.IndexIngestionSpec(
new DataSchema(