Add SameIntervalMergeTask for easier usage of MergeTask (#3981)

* Add SameIntervalMergeTask for easier usage of MergeTask

* fix a bug and add ut

* remove same_interval_merge_sub from Task.java and remove other no needed code
This commit is contained in:
kaijianding 2017-03-07 01:21:32 +08:00 committed by Himanshu
parent bebf9f34c7
commit 19ac1c7c2c
7 changed files with 480 additions and 3 deletions

View File

@ -186,6 +186,24 @@ The grammar is:
}
```
### Same Interval Merge Task
Same Interval Merge task is a shortcut of merge task, all segments in the interval are going to be merged.
The grammar is:
```json
{
"type": "same_interval_merge",
"id": <task_id>,
"dataSource": <task_datasource>,
"aggregations": <list of aggregators>,
"rollup": <whether or not to rollup data during a merge>,
"buildV9Directly": <true or false, default true>,
"interval": <DataSegment objects in this interval are going to be merged>
}
```
Segment Destroying Tasks
------------------------

View File

@ -104,6 +104,18 @@ public class MergeTask extends MergeTaskBase
return "merge";
}
@JsonProperty
public Boolean getRollup()
{
return rollup;
}
@JsonProperty
public IndexSpec getIndexSpec()
{
return indexSpec;
}
@JsonProperty("aggregations")
public List<AggregatorFactory> getAggregators()
{

View File

@ -99,6 +99,12 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask
)
) == 0, "segments in the wrong datasource"
);
verifyInputSegments(segments);
this.segments = segments;
}
protected void verifyInputSegments(List<DataSegment> segments) {
// Verify segments are all unsharded
Preconditions.checkArgument(
Iterables.size(
@ -115,8 +121,6 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask
)
) == 0, "segments without NoneShardSpec"
);
this.segments = segments;
}
@Override

View File

@ -0,0 +1,171 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexSpec;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.List;
import java.util.Map;
/**
*/
public class SameIntervalMergeTask extends AbstractFixedIntervalTask
{
private static final Boolean defaultBuildV9Directly = Boolean.TRUE;
private static final String TYPE = "same_interval_merge";
@JsonIgnore
private final List<AggregatorFactory> aggregators;
private final Boolean rollup;
private final IndexSpec indexSpec;
private final Boolean buildV9Directly;
public SameIntervalMergeTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("aggregations") List<AggregatorFactory> aggregators,
@JsonProperty("rollup") Boolean rollup,
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("buildV9Directly") Boolean buildV9Directly,
@JsonProperty("context") Map<String, Object> context
)
{
super(
makeId(id, TYPE, dataSource, interval),
dataSource,
interval,
context
);
this.aggregators = Preconditions.checkNotNull(aggregators, "null aggregations");
this.rollup = rollup == null ? Boolean.TRUE : rollup;
this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec;
this.buildV9Directly = buildV9Directly == null ? defaultBuildV9Directly : buildV9Directly;
}
@JsonProperty("aggregations")
public List<AggregatorFactory> getAggregators()
{
return aggregators;
}
@JsonProperty
public Boolean getRollup()
{
return rollup;
}
@JsonProperty
public IndexSpec getIndexSpec()
{
return indexSpec;
}
@JsonProperty
public Boolean getBuildV9Directly()
{
return buildV9Directly;
}
public static String makeId(String id, final String typeName, String dataSource, Interval interval)
{
return id != null ? id : joinId(
typeName,
dataSource,
interval.getStart(),
interval.getEnd(),
new DateTime().toString()
);
}
@Override
public String getType()
{
return TYPE;
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
final List<DataSegment> segments = toolbox.getTaskActionClient().submit(
new SegmentListUsedAction(
getDataSource(),
getInterval(),
null
)
);
SubTask mergeTask = new SubTask(
getId(),
getDataSource(),
segments,
aggregators,
rollup,
indexSpec,
buildV9Directly,
getContext()
);
final TaskStatus status = mergeTask.run(toolbox);
if (!status.isSuccess()) {
return TaskStatus.fromCode(getId(), status.getStatusCode());
}
return success();
}
public static class SubTask extends MergeTask
{
public SubTask(
String baseId,
String dataSource,
List<DataSegment> segments,
List<AggregatorFactory> aggregators,
Boolean rollup,
IndexSpec indexSpec,
Boolean buildV9Directly,
Map<String, Object> context
)
{
super(
"sub_" + baseId,
dataSource,
segments,
aggregators,
rollup,
indexSpec,
buildV9Directly,
context
);
}
@Override
protected void verifyInputSegments(List<DataSegment> segments)
{
// do nothing
}
}
}

View File

@ -58,7 +58,8 @@ import java.util.Map;
@JsonSubTypes.Type(name = "version_converter", value = ConvertSegmentBackwardsCompatibleTask.class), // Backwards compat - Deprecated
@JsonSubTypes.Type(name = "version_converter_sub", value = ConvertSegmentBackwardsCompatibleTask.SubTask.class), // backwards compat - Deprecated
@JsonSubTypes.Type(name = "convert_segment", value = ConvertSegmentTask.class),
@JsonSubTypes.Type(name = "convert_segment_sub", value = ConvertSegmentTask.SubTask.class)
@JsonSubTypes.Type(name = "convert_segment_sub", value = ConvertSegmentTask.SubTask.class),
@JsonSubTypes.Type(name = "same_interval_merge", value = SameIntervalMergeTask.class)
})
public interface Task
{

View File

@ -0,0 +1,235 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common.task;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.actions.LockListAction;
import io.druid.indexing.common.actions.LockTryAcquireAction;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.Segment;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.SegmentLoader;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.LinearShardSpec;
import io.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class SameIntervalMergeTaskTest
{
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
public TaskLock taskLock;
private final CountDownLatch isRedayCountDown = new CountDownLatch(1);
private final CountDownLatch publishCountDown = new CountDownLatch(1);
private final IndexSpec indexSpec;
private final ObjectMapper jsonMapper;
private IndexIO indexIO;
public SameIntervalMergeTaskTest()
{
indexSpec = new IndexSpec();
TestUtils testUtils = new TestUtils();
jsonMapper = testUtils.getTestObjectMapper();
indexIO = testUtils.getTestIndexIO();
}
@Test
public void testRun() throws Exception
{
final List<AggregatorFactory> aggregators = ImmutableList.<AggregatorFactory>of(new CountAggregatorFactory("cnt"));
final SameIntervalMergeTask task = new SameIntervalMergeTask(
null,
"foo",
new Interval("2010-01-01/P1D"),
aggregators,
true,
indexSpec,
true,
null
);
String newVersion = "newVersion";
final List<DataSegment> segments = runTask(task, newVersion);
// the lock is acquired
Assert.assertEquals(0, isRedayCountDown.getCount());
// the merged segment is published
Assert.assertEquals(0, publishCountDown.getCount());
// the merged segment is the only element
Assert.assertEquals(1, segments.size());
DataSegment mergeSegment = segments.get(0);
Assert.assertEquals("foo", mergeSegment.getDataSource());
Assert.assertEquals(newVersion, mergeSegment.getVersion());
// the merged segment's interval is within the requested interval
Assert.assertTrue(new Interval("2010-01-01/P1D").contains(mergeSegment.getInterval()));
// the merged segment should be NoneShardSpec
Assert.assertTrue(mergeSegment.getShardSpec() instanceof NoneShardSpec);
}
private List<DataSegment> runTask(final SameIntervalMergeTask mergeTask, final String version) throws Exception
{
boolean isReady = mergeTask.isReady(new TaskActionClient()
{
@Override
public <RetType> RetType submit(TaskAction<RetType> taskAction) throws IOException
{
if (taskAction instanceof LockTryAcquireAction) {
// the lock of this interval is required
Assert.assertEquals(mergeTask.getInterval(), ((LockTryAcquireAction) taskAction).getInterval());
isRedayCountDown.countDown();
taskLock = new TaskLock(
mergeTask.getGroupId(),
mergeTask.getDataSource(),
mergeTask.getInterval(),
version
);
return (RetType) taskLock;
}
return null;
}
});
// ensure LockTryAcquireAction is submitted
Assert.assertTrue(isReady);
final List<DataSegment> segments = Lists.newArrayList();
mergeTask.run(
new TaskToolbox(
null, null, new TaskActionClient()
{
@Override
public <RetType> RetType submit(TaskAction<RetType> taskAction) throws IOException
{
if (taskAction instanceof LockListAction) {
Assert.assertNotNull("taskLock should be acquired before list", taskLock);
return (RetType) Arrays.asList(taskLock);
}
if (taskAction instanceof SegmentListUsedAction) {
List<DataSegment> segments = ImmutableList.of(
DataSegment.builder()
.dataSource(mergeTask.getDataSource())
.interval(new Interval("2010-01-01/PT1H"))
.version("oldVersion")
.shardSpec(new LinearShardSpec(0))
.build(),
DataSegment.builder()
.dataSource(mergeTask.getDataSource())
.interval(new Interval("2010-01-01/PT1H"))
.version("oldVersion")
.shardSpec(new LinearShardSpec(0))
.build(),
DataSegment.builder()
.dataSource(mergeTask.getDataSource())
.interval(new Interval("2010-01-01/PT2H"))
.version("oldVersion")
.shardSpec(new LinearShardSpec(0))
.build()
);
return (RetType) segments;
}
if (taskAction instanceof SegmentInsertAction) {
publishCountDown.countDown();
return null;
}
return null;
}
}, new NoopServiceEmitter(), new DataSegmentPusher()
{
@Deprecated
@Override
public String getPathForHadoop(String dataSource)
{
return getPathForHadoop();
}
@Override
public String getPathForHadoop()
{
return null;
}
@Override
public DataSegment push(File file, DataSegment segment) throws IOException
{
// the merged segment is pushed to storage
segments.add(segment);
return segment;
}
}, null, null, null, null, null, null, null, null, new SegmentLoader()
{
@Override
public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException
{
return false;
}
@Override
public Segment getSegment(DataSegment segment) throws SegmentLoadingException
{
return null;
}
@Override
public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException
{
// dummy file to represent the downloaded segment's dir
return new File("" + segment.getShardSpec().getPartitionNum());
}
@Override
public void cleanup(DataSegment segment) throws SegmentLoadingException
{
}
}, jsonMapper, temporaryFolder.newFolder(),
EasyMock.createMock(IndexMerger.class), indexIO, null, null, EasyMock.createMock(IndexMergerV9.class)
)
);
return segments;
}
}

View File

@ -336,6 +336,42 @@ public class TaskSerdeTest
Assert.assertEquals(aggregators, task3.getAggregators());
}
@Test
public void testSameIntervalMergeTaskSerde() throws Exception
{
final List<AggregatorFactory> aggregators = ImmutableList.<AggregatorFactory>of(new CountAggregatorFactory("cnt"));
final SameIntervalMergeTask task = new SameIntervalMergeTask(
null,
"foo",
new Interval("2010-01-01/P1D"),
aggregators,
true,
indexSpec,
true,
null
);
final String json = jsonMapper.writeValueAsString(task);
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
final SameIntervalMergeTask task2 = (SameIntervalMergeTask) jsonMapper.readValue(json, Task.class);
Assert.assertEquals("foo", task.getDataSource());
Assert.assertEquals(new Interval("2010-01-01/P1D"), task.getInterval());
Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getInterval(), task2.getInterval());
Assert.assertEquals(task.getRollup(), task2.getRollup());
Assert.assertEquals(task.getBuildV9Directly(), task2.getBuildV9Directly());
Assert.assertEquals(task.getIndexSpec(), task2.getIndexSpec());
Assert.assertEquals(
task.getAggregators().get(0).getName(),
task2.getAggregators().get(0).getName()
);
}
@Test
public void testKillTaskSerde() throws Exception
{