From 19ac1c7c2c05a7fa35d4843a50c192c244bacbe4 Mon Sep 17 00:00:00 2001 From: kaijianding Date: Tue, 7 Mar 2017 01:21:32 +0800 Subject: [PATCH] Add SameIntervalMergeTask for easier usage of MergeTask (#3981) * Add SameIntervalMergeTask for easier usage of MergeTask * fix a bug and add ut * remove same_interval_merge_sub from Task.java and remove other no needed code --- docs/content/ingestion/tasks.md | 18 ++ .../druid/indexing/common/task/MergeTask.java | 12 + .../indexing/common/task/MergeTaskBase.java | 8 +- .../common/task/SameIntervalMergeTask.java | 171 +++++++++++++ .../io/druid/indexing/common/task/Task.java | 3 +- .../task/SameIntervalMergeTaskTest.java | 235 ++++++++++++++++++ .../indexing/common/task/TaskSerdeTest.java | 36 +++ 7 files changed, 480 insertions(+), 3 deletions(-) create mode 100644 indexing-service/src/main/java/io/druid/indexing/common/task/SameIntervalMergeTask.java create mode 100644 indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java diff --git a/docs/content/ingestion/tasks.md b/docs/content/ingestion/tasks.md index 507f589f459..6e01e82a386 100644 --- a/docs/content/ingestion/tasks.md +++ b/docs/content/ingestion/tasks.md @@ -186,6 +186,24 @@ The grammar is: } ``` +### Same Interval Merge Task + +Same Interval Merge task is a shortcut of merge task, all segments in the interval are going to be merged. + +The grammar is: + +```json +{ + "type": "same_interval_merge", + "id": , + "dataSource": , + "aggregations": , + "rollup": , + "buildV9Directly": , + "interval": +} +``` + Segment Destroying Tasks ------------------------ diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTask.java index 7cc914f0df5..812dcc06e61 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTask.java @@ -104,6 +104,18 @@ public class MergeTask extends MergeTaskBase return "merge"; } + @JsonProperty + public Boolean getRollup() + { + return rollup; + } + + @JsonProperty + public IndexSpec getIndexSpec() + { + return indexSpec; + } + @JsonProperty("aggregations") public List getAggregators() { diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java index 6123dc311cb..fdec7e63c1b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java @@ -99,6 +99,12 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask ) ) == 0, "segments in the wrong datasource" ); + verifyInputSegments(segments); + + this.segments = segments; + } + + protected void verifyInputSegments(List segments) { // Verify segments are all unsharded Preconditions.checkArgument( Iterables.size( @@ -115,8 +121,6 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask ) ) == 0, "segments without NoneShardSpec" ); - - this.segments = segments; } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/SameIntervalMergeTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/SameIntervalMergeTask.java new file mode 100644 index 00000000000..0af2b761282 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/SameIntervalMergeTask.java @@ -0,0 +1,171 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.SegmentListUsedAction; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.segment.IndexSpec; +import io.druid.timeline.DataSegment; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import java.util.List; +import java.util.Map; + +/** + */ +public class SameIntervalMergeTask extends AbstractFixedIntervalTask +{ + private static final Boolean defaultBuildV9Directly = Boolean.TRUE; + private static final String TYPE = "same_interval_merge"; + @JsonIgnore + private final List aggregators; + private final Boolean rollup; + private final IndexSpec indexSpec; + private final Boolean buildV9Directly; + + public SameIntervalMergeTask( + @JsonProperty("id") String id, + @JsonProperty("dataSource") String dataSource, + @JsonProperty("interval") Interval interval, + @JsonProperty("aggregations") List aggregators, + @JsonProperty("rollup") Boolean rollup, + @JsonProperty("indexSpec") IndexSpec indexSpec, + @JsonProperty("buildV9Directly") Boolean buildV9Directly, + @JsonProperty("context") Map context + ) + { + super( + makeId(id, TYPE, dataSource, interval), + dataSource, + interval, + context + ); + this.aggregators = Preconditions.checkNotNull(aggregators, "null aggregations"); + this.rollup = rollup == null ? Boolean.TRUE : rollup; + this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec; + this.buildV9Directly = buildV9Directly == null ? defaultBuildV9Directly : buildV9Directly; + } + + @JsonProperty("aggregations") + public List getAggregators() + { + return aggregators; + } + + @JsonProperty + public Boolean getRollup() + { + return rollup; + } + + @JsonProperty + public IndexSpec getIndexSpec() + { + return indexSpec; + } + + @JsonProperty + public Boolean getBuildV9Directly() + { + return buildV9Directly; + } + + public static String makeId(String id, final String typeName, String dataSource, Interval interval) + { + return id != null ? id : joinId( + typeName, + dataSource, + interval.getStart(), + interval.getEnd(), + new DateTime().toString() + ); + } + + @Override + public String getType() + { + return TYPE; + } + + @Override + public TaskStatus run(TaskToolbox toolbox) throws Exception + { + final List segments = toolbox.getTaskActionClient().submit( + new SegmentListUsedAction( + getDataSource(), + getInterval(), + null + ) + ); + SubTask mergeTask = new SubTask( + getId(), + getDataSource(), + segments, + aggregators, + rollup, + indexSpec, + buildV9Directly, + getContext() + ); + final TaskStatus status = mergeTask.run(toolbox); + if (!status.isSuccess()) { + return TaskStatus.fromCode(getId(), status.getStatusCode()); + } + return success(); + } + + public static class SubTask extends MergeTask + { + public SubTask( + String baseId, + String dataSource, + List segments, + List aggregators, + Boolean rollup, + IndexSpec indexSpec, + Boolean buildV9Directly, + Map context + ) + { + super( + "sub_" + baseId, + dataSource, + segments, + aggregators, + rollup, + indexSpec, + buildV9Directly, + context + ); + } + + @Override + protected void verifyInputSegments(List segments) + { + // do nothing + } + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java index e8cf245e4e0..7f57c0e7662 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java @@ -58,7 +58,8 @@ import java.util.Map; @JsonSubTypes.Type(name = "version_converter", value = ConvertSegmentBackwardsCompatibleTask.class), // Backwards compat - Deprecated @JsonSubTypes.Type(name = "version_converter_sub", value = ConvertSegmentBackwardsCompatibleTask.SubTask.class), // backwards compat - Deprecated @JsonSubTypes.Type(name = "convert_segment", value = ConvertSegmentTask.class), - @JsonSubTypes.Type(name = "convert_segment_sub", value = ConvertSegmentTask.SubTask.class) + @JsonSubTypes.Type(name = "convert_segment_sub", value = ConvertSegmentTask.SubTask.class), + @JsonSubTypes.Type(name = "same_interval_merge", value = SameIntervalMergeTask.class) }) public interface Task { diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java new file mode 100644 index 00000000000..09d1973c224 --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java @@ -0,0 +1,235 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import io.druid.indexing.common.TaskLock; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.TestUtils; +import io.druid.indexing.common.actions.LockListAction; +import io.druid.indexing.common.actions.LockTryAcquireAction; +import io.druid.indexing.common.actions.SegmentInsertAction; +import io.druid.indexing.common.actions.SegmentListUsedAction; +import io.druid.indexing.common.actions.TaskAction; +import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.segment.IndexIO; +import io.druid.segment.IndexMerger; +import io.druid.segment.IndexMergerV9; +import io.druid.segment.IndexSpec; +import io.druid.segment.Segment; +import io.druid.segment.loading.DataSegmentPusher; +import io.druid.segment.loading.SegmentLoader; +import io.druid.segment.loading.SegmentLoadingException; +import io.druid.server.metrics.NoopServiceEmitter; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.LinearShardSpec; +import io.druid.timeline.partition.NoneShardSpec; +import org.easymock.EasyMock; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +public class SameIntervalMergeTaskTest +{ + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + public TaskLock taskLock; + private final CountDownLatch isRedayCountDown = new CountDownLatch(1); + private final CountDownLatch publishCountDown = new CountDownLatch(1); + private final IndexSpec indexSpec; + private final ObjectMapper jsonMapper; + private IndexIO indexIO; + + public SameIntervalMergeTaskTest() + { + indexSpec = new IndexSpec(); + TestUtils testUtils = new TestUtils(); + jsonMapper = testUtils.getTestObjectMapper(); + indexIO = testUtils.getTestIndexIO(); + } + + @Test + public void testRun() throws Exception + { + final List aggregators = ImmutableList.of(new CountAggregatorFactory("cnt")); + final SameIntervalMergeTask task = new SameIntervalMergeTask( + null, + "foo", + new Interval("2010-01-01/P1D"), + aggregators, + true, + indexSpec, + true, + null + ); + + String newVersion = "newVersion"; + final List segments = runTask(task, newVersion); + + // the lock is acquired + Assert.assertEquals(0, isRedayCountDown.getCount()); + // the merged segment is published + Assert.assertEquals(0, publishCountDown.getCount()); + // the merged segment is the only element + Assert.assertEquals(1, segments.size()); + + DataSegment mergeSegment = segments.get(0); + Assert.assertEquals("foo", mergeSegment.getDataSource()); + Assert.assertEquals(newVersion, mergeSegment.getVersion()); + // the merged segment's interval is within the requested interval + Assert.assertTrue(new Interval("2010-01-01/P1D").contains(mergeSegment.getInterval())); + // the merged segment should be NoneShardSpec + Assert.assertTrue(mergeSegment.getShardSpec() instanceof NoneShardSpec); + } + + private List runTask(final SameIntervalMergeTask mergeTask, final String version) throws Exception + { + boolean isReady = mergeTask.isReady(new TaskActionClient() + { + @Override + public RetType submit(TaskAction taskAction) throws IOException + { + if (taskAction instanceof LockTryAcquireAction) { + // the lock of this interval is required + Assert.assertEquals(mergeTask.getInterval(), ((LockTryAcquireAction) taskAction).getInterval()); + isRedayCountDown.countDown(); + taskLock = new TaskLock( + mergeTask.getGroupId(), + mergeTask.getDataSource(), + mergeTask.getInterval(), + version + ); + return (RetType) taskLock; + } + return null; + } + }); + // ensure LockTryAcquireAction is submitted + Assert.assertTrue(isReady); + final List segments = Lists.newArrayList(); + + mergeTask.run( + new TaskToolbox( + null, null, new TaskActionClient() + { + @Override + public RetType submit(TaskAction taskAction) throws IOException + { + if (taskAction instanceof LockListAction) { + Assert.assertNotNull("taskLock should be acquired before list", taskLock); + return (RetType) Arrays.asList(taskLock); + } + if (taskAction instanceof SegmentListUsedAction) { + List segments = ImmutableList.of( + DataSegment.builder() + .dataSource(mergeTask.getDataSource()) + .interval(new Interval("2010-01-01/PT1H")) + .version("oldVersion") + .shardSpec(new LinearShardSpec(0)) + .build(), + DataSegment.builder() + .dataSource(mergeTask.getDataSource()) + .interval(new Interval("2010-01-01/PT1H")) + .version("oldVersion") + .shardSpec(new LinearShardSpec(0)) + .build(), + DataSegment.builder() + .dataSource(mergeTask.getDataSource()) + .interval(new Interval("2010-01-01/PT2H")) + .version("oldVersion") + .shardSpec(new LinearShardSpec(0)) + .build() + ); + return (RetType) segments; + } + if (taskAction instanceof SegmentInsertAction) { + publishCountDown.countDown(); + return null; + } + + return null; + } + }, new NoopServiceEmitter(), new DataSegmentPusher() + { + @Deprecated + @Override + public String getPathForHadoop(String dataSource) + { + return getPathForHadoop(); + } + + @Override + public String getPathForHadoop() + { + return null; + } + + @Override + public DataSegment push(File file, DataSegment segment) throws IOException + { + // the merged segment is pushed to storage + segments.add(segment); + return segment; + } + }, null, null, null, null, null, null, null, null, new SegmentLoader() + { + @Override + public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException + { + return false; + } + + @Override + public Segment getSegment(DataSegment segment) throws SegmentLoadingException + { + return null; + } + + @Override + public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException + { + // dummy file to represent the downloaded segment's dir + return new File("" + segment.getShardSpec().getPartitionNum()); + } + + @Override + public void cleanup(DataSegment segment) throws SegmentLoadingException + { + } + }, jsonMapper, temporaryFolder.newFolder(), + EasyMock.createMock(IndexMerger.class), indexIO, null, null, EasyMock.createMock(IndexMergerV9.class) + ) + ); + + return segments; + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 3f26def76bc..cb47d4b8b8f 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -336,6 +336,42 @@ public class TaskSerdeTest Assert.assertEquals(aggregators, task3.getAggregators()); } + @Test + public void testSameIntervalMergeTaskSerde() throws Exception + { + final List aggregators = ImmutableList.of(new CountAggregatorFactory("cnt")); + final SameIntervalMergeTask task = new SameIntervalMergeTask( + null, + "foo", + new Interval("2010-01-01/P1D"), + aggregators, + true, + indexSpec, + true, + null + ); + + final String json = jsonMapper.writeValueAsString(task); + + Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change + final SameIntervalMergeTask task2 = (SameIntervalMergeTask) jsonMapper.readValue(json, Task.class); + + Assert.assertEquals("foo", task.getDataSource()); + Assert.assertEquals(new Interval("2010-01-01/P1D"), task.getInterval()); + + Assert.assertEquals(task.getId(), task2.getId()); + Assert.assertEquals(task.getGroupId(), task2.getGroupId()); + Assert.assertEquals(task.getDataSource(), task2.getDataSource()); + Assert.assertEquals(task.getInterval(), task2.getInterval()); + Assert.assertEquals(task.getRollup(), task2.getRollup()); + Assert.assertEquals(task.getBuildV9Directly(), task2.getBuildV9Directly()); + Assert.assertEquals(task.getIndexSpec(), task2.getIndexSpec()); + Assert.assertEquals( + task.getAggregators().get(0).getName(), + task2.getAggregators().get(0).getName() + ); + } + @Test public void testKillTaskSerde() throws Exception {