support multiple intervals in SegmentListUsedAction

This commit is contained in:
Himanshu Gupta 2015-11-18 13:10:32 -06:00
parent 221fb95d07
commit 86f0a36e83
6 changed files with 132 additions and 12 deletions

View File

@ -21,6 +21,9 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import io.druid.common.utils.JodaUtils;
import io.druid.indexing.common.task.Task;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
@ -34,16 +37,29 @@ public class SegmentListUsedAction implements TaskAction<List<DataSegment>>
private final String dataSource;
@JsonIgnore
private final Interval interval;
private final List<Interval> intervals;
@JsonCreator
public SegmentListUsedAction(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval
@Deprecated @JsonProperty("interval") Interval interval,
@JsonProperty("intervals") List<Interval> intervals
)
{
this.dataSource = dataSource;
this.interval = interval;
Preconditions.checkArgument(
interval == null || intervals == null,
"please specify intervals only"
);
List<Interval> theIntervals = null;
if (interval != null) {
theIntervals = ImmutableList.of(interval);
} else if (intervals != null && intervals.size() > 0) {
theIntervals = JodaUtils.condenseIntervals(intervals);
}
this.intervals = Preconditions.checkNotNull(theIntervals, "no intervals found");
}
@JsonProperty
@ -53,9 +69,9 @@ public class SegmentListUsedAction implements TaskAction<List<DataSegment>>
}
@JsonProperty
public Interval getInterval()
public List<Interval> getIntervals()
{
return interval;
return intervals;
}
public TypeReference<List<DataSegment>> getReturnTypeReference()
@ -66,7 +82,7 @@ public class SegmentListUsedAction implements TaskAction<List<DataSegment>>
@Override
public List<DataSegment> perform(Task task, TaskActionToolbox toolbox) throws IOException
{
return toolbox.getIndexerMetadataStorageCoordinator().getUsedSegmentsForInterval(dataSource, interval);
return toolbox.getIndexerMetadataStorageCoordinator().getUsedSegmentsForIntervals(dataSource, intervals);
}
@Override
@ -75,12 +91,39 @@ public class SegmentListUsedAction implements TaskAction<List<DataSegment>>
return false;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SegmentListUsedAction that = (SegmentListUsedAction) o;
if (!dataSource.equals(that.dataSource)) {
return false;
}
return intervals.equals(that.intervals);
}
@Override
public int hashCode()
{
int result = dataSource.hashCode();
result = 31 * result + intervals.hashCode();
return result;
}
@Override
public String toString()
{
return "SegmentListUsedAction{" +
"dataSource='" + dataSource + '\'' +
", interval=" + interval +
", intervals=" + intervals +
'}';
}
}

View File

@ -199,7 +199,8 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
final List<DataSegment> segments = toolbox.getTaskActionClient().submit(
new SegmentListUsedAction(
getDataSource(),
getInterval()
getInterval(),
null
)
);
segmentsToUpdate = FunctionalIterable
@ -364,7 +365,7 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
log.info("Converting segment[%s]", segment);
final TaskActionClient actionClient = toolbox.getTaskActionClient();
final List<DataSegment> currentSegments = actionClient.submit(
new SegmentListUsedAction(segment.getDataSource(), segment.getInterval())
new SegmentListUsedAction(segment.getDataSource(), segment.getInterval(), null)
);
for (DataSegment currentSegment : currentSegments) {

View File

@ -204,7 +204,7 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask
final Set<String> current = ImmutableSet.copyOf(
Iterables.transform(
taskActionClient.submit(new SegmentListUsedAction(getDataSource(), getInterval())),
taskActionClient.submit(new SegmentListUsedAction(getDataSource(), getInterval(), null)),
toIdentifier
)
);

View File

@ -132,7 +132,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
try {
final List<DataSegment> usedSegments = toolbox
.getTaskActionClient()
.submit(new SegmentListUsedAction(dataSource, interval));
.submit(new SegmentListUsedAction(dataSource, interval, null));
final Map<DataSegment, File> segmentFileMap = toolbox.fetchSegments(usedSegments);
VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<>(
Ordering.<String>natural().nullsFirst()

View File

@ -0,0 +1,76 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common.actions;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import io.druid.TestUtil;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
/**
*/
public class SegmentListUsedActionTest
{
private static final ObjectMapper MAPPER = TestUtil.MAPPER;
@Test
public void testSingleIntervalSerde() throws Exception
{
Interval interval = Interval.parse("2014/2015");
SegmentListUsedAction expected = new SegmentListUsedAction(
"dataSource",
interval,
null
);
SegmentListUsedAction actual = MAPPER.readValue(MAPPER.writeValueAsString(expected), SegmentListUsedAction.class);
Assert.assertEquals(ImmutableList.of(interval), actual.getIntervals());
Assert.assertEquals(expected, actual);
}
@Test
public void testMultiIntervalSerde() throws Exception
{
List<Interval> intervals = ImmutableList.of(Interval.parse("2014/2015"), Interval.parse("2016/2017"));
SegmentListUsedAction expected = new SegmentListUsedAction(
"dataSource",
null,
intervals
);
SegmentListUsedAction actual = MAPPER.readValue(MAPPER.writeValueAsString(expected), SegmentListUsedAction.class);
Assert.assertEquals(intervals, actual.getIntervals());
Assert.assertEquals(expected, actual);
}
@Test
public void testOldJsonDeserialization() throws Exception
{
String jsonStr = "{\"type\": \"segmentListUsed\", \"dataSource\": \"test\", \"interval\": \"2014/2015\"}";
SegmentListUsedAction actual = (SegmentListUsedAction) MAPPER.readValue(jsonStr, TaskAction.class);
Assert.assertEquals(new SegmentListUsedAction("test", Interval.parse("2014/2015"), null), actual);
}
}

View File

@ -283,7 +283,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest
if (taskAction instanceof SegmentListUsedAction) {
// Expect the interval we asked for
final SegmentListUsedAction action = (SegmentListUsedAction) taskAction;
if (action.getInterval().equals(testCase.interval)) {
if (action.getIntervals().equals(ImmutableList.of(testCase.interval))) {
return (RetType) ImmutableList.copyOf(testCase.segments);
} else {
throw new IllegalArgumentException("WTF");