From 86f0a36e838d095a4a373b1abc85b37ee68e842a Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 18 Nov 2015 13:10:32 -0600 Subject: [PATCH] support multiple intervals in SegmentListUsedAction --- .../common/actions/SegmentListUsedAction.java | 57 ++++++++++++-- .../common/task/ConvertSegmentTask.java | 5 +- .../indexing/common/task/MergeTaskBase.java | 2 +- .../IngestSegmentFirehoseFactory.java | 2 +- .../actions/SegmentListUsedActionTest.java | 76 +++++++++++++++++++ ...estSegmentFirehoseFactoryTimelineTest.java | 2 +- 6 files changed, 132 insertions(+), 12 deletions(-) create mode 100644 indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentListUsedActionTest.java diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentListUsedAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentListUsedAction.java index 2d980f6d2de..6cb7e0c0a48 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentListUsedAction.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentListUsedAction.java @@ -21,6 +21,9 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; +import com.google.api.client.repackaged.com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import io.druid.common.utils.JodaUtils; import io.druid.indexing.common.task.Task; import io.druid.timeline.DataSegment; import org.joda.time.Interval; @@ -34,16 +37,29 @@ public class SegmentListUsedAction implements TaskAction> private final String dataSource; @JsonIgnore - private final Interval interval; + private final List intervals; @JsonCreator public SegmentListUsedAction( @JsonProperty("dataSource") String dataSource, - @JsonProperty("interval") Interval interval + @Deprecated @JsonProperty("interval") Interval interval, + @JsonProperty("intervals") List intervals ) { this.dataSource = dataSource; - this.interval = interval; + + Preconditions.checkArgument( + interval == null || intervals == null, + "please specify intervals only" + ); + + List theIntervals = null; + if (interval != null) { + theIntervals = ImmutableList.of(interval); + } else if (intervals != null && intervals.size() > 0) { + theIntervals = JodaUtils.condenseIntervals(intervals); + } + this.intervals = Preconditions.checkNotNull(theIntervals, "no intervals found"); } @JsonProperty @@ -53,9 +69,9 @@ public class SegmentListUsedAction implements TaskAction> } @JsonProperty - public Interval getInterval() + public List getIntervals() { - return interval; + return intervals; } public TypeReference> getReturnTypeReference() @@ -66,7 +82,7 @@ public class SegmentListUsedAction implements TaskAction> @Override public List perform(Task task, TaskActionToolbox toolbox) throws IOException { - return toolbox.getIndexerMetadataStorageCoordinator().getUsedSegmentsForInterval(dataSource, interval); + return toolbox.getIndexerMetadataStorageCoordinator().getUsedSegmentsForIntervals(dataSource, intervals); } @Override @@ -75,12 +91,39 @@ public class SegmentListUsedAction implements TaskAction> return false; } + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SegmentListUsedAction that = (SegmentListUsedAction) o; + + if (!dataSource.equals(that.dataSource)) { + return false; + } + return intervals.equals(that.intervals); + + } + + @Override + public int hashCode() + { + int result = dataSource.hashCode(); + result = 31 * result + intervals.hashCode(); + return result; + } + @Override public String toString() { return "SegmentListUsedAction{" + "dataSource='" + dataSource + '\'' + - ", interval=" + interval + + ", intervals=" + intervals + '}'; } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java index 3858e0efd21..b2ea15342b6 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java @@ -199,7 +199,8 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask final List segments = toolbox.getTaskActionClient().submit( new SegmentListUsedAction( getDataSource(), - getInterval() + getInterval(), + null ) ); segmentsToUpdate = FunctionalIterable @@ -364,7 +365,7 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask log.info("Converting segment[%s]", segment); final TaskActionClient actionClient = toolbox.getTaskActionClient(); final List currentSegments = actionClient.submit( - new SegmentListUsedAction(segment.getDataSource(), segment.getInterval()) + new SegmentListUsedAction(segment.getDataSource(), segment.getInterval(), null) ); for (DataSegment currentSegment : currentSegments) { diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java index 018e06c0664..e3c861f602a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java @@ -204,7 +204,7 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask final Set current = ImmutableSet.copyOf( Iterables.transform( - taskActionClient.submit(new SegmentListUsedAction(getDataSource(), getInterval())), + taskActionClient.submit(new SegmentListUsedAction(getDataSource(), getInterval(), null)), toIdentifier ) ); diff --git a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index 9855d7408a0..609020bcc1a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -132,7 +132,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory usedSegments = toolbox .getTaskActionClient() - .submit(new SegmentListUsedAction(dataSource, interval)); + .submit(new SegmentListUsedAction(dataSource, interval, null)); final Map segmentFileMap = toolbox.fetchSegments(usedSegments); VersionedIntervalTimeline timeline = new VersionedIntervalTimeline<>( Ordering.natural().nullsFirst() diff --git a/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentListUsedActionTest.java b/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentListUsedActionTest.java new file mode 100644 index 00000000000..567245b9fa1 --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentListUsedActionTest.java @@ -0,0 +1,76 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexing.common.actions; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import io.druid.TestUtil; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +/** + */ +public class SegmentListUsedActionTest +{ + private static final ObjectMapper MAPPER = TestUtil.MAPPER; + + @Test + public void testSingleIntervalSerde() throws Exception + { + Interval interval = Interval.parse("2014/2015"); + + SegmentListUsedAction expected = new SegmentListUsedAction( + "dataSource", + interval, + null + ); + + SegmentListUsedAction actual = MAPPER.readValue(MAPPER.writeValueAsString(expected), SegmentListUsedAction.class); + Assert.assertEquals(ImmutableList.of(interval), actual.getIntervals()); + Assert.assertEquals(expected, actual); + } + + @Test + public void testMultiIntervalSerde() throws Exception + { + List intervals = ImmutableList.of(Interval.parse("2014/2015"), Interval.parse("2016/2017")); + SegmentListUsedAction expected = new SegmentListUsedAction( + "dataSource", + null, + intervals + ); + + SegmentListUsedAction actual = MAPPER.readValue(MAPPER.writeValueAsString(expected), SegmentListUsedAction.class); + Assert.assertEquals(intervals, actual.getIntervals()); + Assert.assertEquals(expected, actual); + } + + @Test + public void testOldJsonDeserialization() throws Exception + { + String jsonStr = "{\"type\": \"segmentListUsed\", \"dataSource\": \"test\", \"interval\": \"2014/2015\"}"; + SegmentListUsedAction actual = (SegmentListUsedAction) MAPPER.readValue(jsonStr, TaskAction.class); + + Assert.assertEquals(new SegmentListUsedAction("test", Interval.parse("2014/2015"), null), actual); + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java index c2ee4a84e02..1af3c34429f 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java @@ -283,7 +283,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest if (taskAction instanceof SegmentListUsedAction) { // Expect the interval we asked for final SegmentListUsedAction action = (SegmentListUsedAction) taskAction; - if (action.getInterval().equals(testCase.interval)) { + if (action.getIntervals().equals(ImmutableList.of(testCase.interval))) { return (RetType) ImmutableList.copyOf(testCase.segments); } else { throw new IllegalArgumentException("WTF");