Merge pull request #1988 from himanshug/multi-interval-batch-delta

support multiple intervals in dataSource inputSpec
This commit is contained in:
Gian Merlino 2015-12-04 09:07:52 -08:00
commit 20544d409b
22 changed files with 394 additions and 73 deletions

View File

@ -177,13 +177,13 @@ Here is what goes inside "ingestionSpec"
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|dataSource|String|Druid dataSource name from which you are loading the data.|yes|
|interval|String|A string representing ISO-8601 Intervals.|yes|
|interval|String|This is deprecated, please use intervals.|no|
|intervals|List|A list of strings representing ISO-8601 Intervals.|yes|
|granularity|String|Defines the granularity of the query while loading data. Default value is "none".See [Granularities](../querying/granularities.html).|no|
|filter|Json|See [Filters](../querying/filters.html)|no|
|dimensions|Array of String|Name of dimension columns to load. By default, the list will be constructed from parseSpec. If parseSpec does not have explicit list of dimensions then all the dimension columns present in stored data will be read.|no|
|metrics|Array of String|Name of metric columns to load. By default, the list will be constructed from the "name" of all the configured aggregators.|no|
For example
```

View File

@ -31,6 +31,7 @@ import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.PartitionChunk;
import org.joda.time.Interval;
import java.io.IOException;
import java.util.List;
@ -140,22 +141,26 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
ingestionSpecMap,
DatasourceIngestionSpec.class
);
List<DataSegment> segmentsList = segmentLister.getUsedSegmentsForInterval(
List<DataSegment> segmentsList = segmentLister.getUsedSegmentsForIntervals(
ingestionSpecObj.getDataSource(),
ingestionSpecObj.getInterval()
ingestionSpecObj.getIntervals()
);
VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<>(Ordering.natural());
for (DataSegment segment : segmentsList) {
timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment));
}
final List<TimelineObjectHolder<String, DataSegment>> timeLineSegments = timeline.lookup(ingestionSpecObj.getInterval());
final List<WindowedDataSegment> windowedSegments = Lists.newArrayList();
for (TimelineObjectHolder<String, DataSegment> holder : timeLineSegments) {
for (PartitionChunk<DataSegment> chunk : holder.getObject()) {
windowedSegments.add(new WindowedDataSegment(chunk.getObject(), holder.getInterval()));
for (Interval interval : ingestionSpecObj.getIntervals()) {
final List<TimelineObjectHolder<String, DataSegment>> timeLineSegments = timeline.lookup(interval);
for (TimelineObjectHolder<String, DataSegment> holder : timeLineSegments) {
for (PartitionChunk<DataSegment> chunk : holder.getObject()) {
windowedSegments.add(new WindowedDataSegment(chunk.getObject(), holder.getInterval()));
}
}
datasourcePathSpec.put(segments, windowedSegments);
}
datasourcePathSpec.put(segments, windowedSegments);
}
return spec;

View File

@ -22,6 +22,8 @@ package io.druid.indexer.hadoop;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import io.druid.common.utils.JodaUtils;
import io.druid.granularity.QueryGranularity;
import io.druid.query.filter.DimFilter;
import org.joda.time.Interval;
@ -31,7 +33,7 @@ import java.util.List;
public class DatasourceIngestionSpec
{
private final String dataSource;
private final Interval interval;
private final List<Interval> intervals;
private final DimFilter filter;
private final QueryGranularity granularity;
private final List<String> dimensions;
@ -40,7 +42,8 @@ public class DatasourceIngestionSpec
@JsonCreator
public DatasourceIngestionSpec(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@Deprecated @JsonProperty("interval") Interval interval,
@JsonProperty("intervals") List<Interval> intervals,
@JsonProperty("filter") DimFilter filter,
@JsonProperty("granularity") QueryGranularity granularity,
@JsonProperty("dimensions") List<String> dimensions,
@ -48,7 +51,20 @@ public class DatasourceIngestionSpec
)
{
this.dataSource = Preconditions.checkNotNull(dataSource, "null dataSource");
this.interval = Preconditions.checkNotNull(interval, "null interval");
Preconditions.checkArgument(
interval == null || intervals == null,
"please specify intervals only"
);
List<Interval> theIntervals = null;
if (interval != null) {
theIntervals = ImmutableList.of(interval);
} else if (intervals != null && intervals.size() > 0) {
theIntervals = JodaUtils.condenseIntervals(intervals);
}
this.intervals = Preconditions.checkNotNull(theIntervals, "no intervals found");
this.filter = filter;
this.granularity = granularity == null ? QueryGranularity.NONE : granularity;
@ -63,9 +79,9 @@ public class DatasourceIngestionSpec
}
@JsonProperty
public Interval getInterval()
public List<Interval> getIntervals()
{
return interval;
return intervals;
}
@JsonProperty
@ -94,17 +110,17 @@ public class DatasourceIngestionSpec
public DatasourceIngestionSpec withDimensions(List<String> dimensions)
{
return new DatasourceIngestionSpec(dataSource, interval, filter, granularity, dimensions, metrics);
return new DatasourceIngestionSpec(dataSource, null, intervals, filter, granularity, dimensions, metrics);
}
public DatasourceIngestionSpec withMetrics(List<String> metrics)
{
return new DatasourceIngestionSpec(dataSource, interval, filter, granularity, dimensions, metrics);
return new DatasourceIngestionSpec(dataSource, null, intervals, filter, granularity, dimensions, metrics);
}
public DatasourceIngestionSpec withQueryGranularity(QueryGranularity granularity)
{
return new DatasourceIngestionSpec(dataSource, interval, filter, granularity, dimensions, metrics);
return new DatasourceIngestionSpec(dataSource, null, intervals, filter, granularity, dimensions, metrics);
}
@Override
@ -122,7 +138,7 @@ public class DatasourceIngestionSpec
if (!dataSource.equals(that.dataSource)) {
return false;
}
if (!interval.equals(that.interval)) {
if (!intervals.equals(that.intervals)) {
return false;
}
if (filter != null ? !filter.equals(that.filter) : that.filter != null) {
@ -142,7 +158,7 @@ public class DatasourceIngestionSpec
public int hashCode()
{
int result = dataSource.hashCode();
result = 31 * result + interval.hashCode();
result = 31 * result + intervals.hashCode();
result = 31 * result + (filter != null ? filter.hashCode() : 0);
result = 31 * result + granularity.hashCode();
result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0);
@ -155,7 +171,7 @@ public class DatasourceIngestionSpec
{
return "DatasourceIngestionSpec{" +
"dataSource='" + dataSource + '\'' +
", interval=" + interval +
", intervals=" + intervals +
", filter=" + filter +
", granularity=" + granularity +
", dimensions=" + dimensions +

View File

@ -99,7 +99,7 @@ public class DatasourcePathSpec implements PathSpec
"Found total [%d] segments for [%s] in interval [%s]",
segments.size(),
ingestionSpec.getDataSource(),
ingestionSpec.getInterval()
ingestionSpec.getIntervals()
);
DatasourceIngestionSpec updatedIngestionSpec = ingestionSpec;

View File

@ -44,10 +44,10 @@ public class MetadataStoreBasedUsedSegmentLister implements UsedSegmentLister
}
@Override
public List<DataSegment> getUsedSegmentsForInterval(
String dataSource, Interval interval
public List<DataSegment> getUsedSegmentsForIntervals(
String dataSource, List<Interval> intervals
) throws IOException
{
return indexerMetadataStorageCoordinator.getUsedSegmentsForInterval(dataSource, interval);
return indexerMetadataStorageCoordinator.getUsedSegmentsForIntervals(dataSource, intervals);
}
}

View File

@ -33,12 +33,12 @@ public interface UsedSegmentLister
* Get all segments which may include any data in the interval and are flagged as used.
*
* @param dataSource The datasource to query
* @param interval The interval for which all applicable and used datasources are requested. Start is inclusive, end is exclusive
* @param intervals The intervals for which used segments are to be returned
*
* @return The DataSegments which include data in the requested interval. These segments may contain data outside the requested interval.
* @return The DataSegments which include data in the requested intervals. These segments may contain data outside the requested interval.
*
* @throws IOException
*/
public List<DataSegment> getUsedSegmentsForInterval(final String dataSource, final Interval interval)
public List<DataSegment> getUsedSegmentsForIntervals(final String dataSource, final List<Interval> intervals)
throws IOException;
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.common.Granularity;
import io.druid.indexer.hadoop.DatasourceIngestionSpec;
import io.druid.indexer.hadoop.WindowedDataSegment;
@ -90,7 +91,7 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
PathSpec pathSpec = new DatasourcePathSpec(
jsonMapper,
null,
new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null),
new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null, null),
null
);
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(
@ -110,7 +111,7 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
PathSpec pathSpec = new DatasourcePathSpec(
jsonMapper,
null,
new DatasourceIngestionSpec(testDatasource, testDatasourceIntervalPartial, null, null, null, null),
new DatasourceIngestionSpec(testDatasource, testDatasourceIntervalPartial, null, null, null, null, null),
null
);
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(
@ -132,7 +133,7 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
new DatasourcePathSpec(
jsonMapper,
null,
new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null),
new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null, null),
null
)
)
@ -182,7 +183,7 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
UsedSegmentLister segmentLister = EasyMock.createMock(UsedSegmentLister.class);
EasyMock.expect(
segmentLister.getUsedSegmentsForInterval(testDatasource, jobInterval)
segmentLister.getUsedSegmentsForIntervals(testDatasource, Lists.newArrayList(jobInterval))
).andReturn(ImmutableList.of(SEGMENT));
EasyMock.replay(segmentLister);

View File

@ -20,33 +20,74 @@
package io.druid.indexer.hadoop;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import io.druid.granularity.QueryGranularity;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.segment.TestHelper;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
/**
*/
public class DatasourceIngestionSpecTest
{
private static final ObjectMapper MAPPER = TestHelper.getObjectMapper();
@Test
public void testSerde() throws Exception
public void testSingleIntervalSerde() throws Exception
{
Interval interval = Interval.parse("2014/2015");
DatasourceIngestionSpec expected = new DatasourceIngestionSpec(
"test",
Interval.parse("2014/2015"),
interval,
null,
new SelectorDimFilter("dim", "value"),
QueryGranularity.DAY,
Lists.newArrayList("d1", "d2"),
Lists.newArrayList("m1", "m2", "m3")
);
ObjectMapper jsonMapper = new DefaultObjectMapper();
DatasourceIngestionSpec actual = jsonMapper.readValue(jsonMapper.writeValueAsString(expected), DatasourceIngestionSpec.class);
DatasourceIngestionSpec actual = MAPPER.readValue(MAPPER.writeValueAsString(expected), DatasourceIngestionSpec.class);
Assert.assertEquals(ImmutableList.of(interval), actual.getIntervals());
Assert.assertEquals(expected, actual);
}
@Test
public void testMultiIntervalSerde() throws Exception
{
List<Interval> intervals = ImmutableList.of(Interval.parse("2014/2015"), Interval.parse("2016/2017"));
DatasourceIngestionSpec expected = new DatasourceIngestionSpec(
"test",
null,
intervals,
new SelectorDimFilter("dim", "value"),
QueryGranularity.DAY,
Lists.newArrayList("d1", "d2"),
Lists.newArrayList("m1", "m2", "m3")
);
DatasourceIngestionSpec actual = MAPPER.readValue(
MAPPER.writeValueAsString(expected),
DatasourceIngestionSpec.class
);
Assert.assertEquals(intervals, actual.getIntervals());
Assert.assertEquals(expected, actual);
}
@Test
public void testOldJsonDeserialization() throws Exception
{
String jsonStr = "{\"dataSource\": \"test\", \"interval\": \"2014/2015\"}";
DatasourceIngestionSpec actual = MAPPER.readValue(jsonStr, DatasourceIngestionSpec.class);
Assert.assertEquals(
new DatasourceIngestionSpec("test", Interval.parse("2014/2015"), null, null, null, null, null),
actual
);
}
}

View File

@ -66,6 +66,7 @@ public class DatasourceRecordReaderTest
segment.getInterval(),
null,
null,
null,
segment.getDimensions(),
segment.getMetrics()
)

View File

@ -77,6 +77,7 @@ public class DatasourcePathSpecTest
null,
null,
null,
null,
null
);

View File

@ -21,6 +21,9 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import io.druid.common.utils.JodaUtils;
import io.druid.indexing.common.task.Task;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
@ -34,16 +37,29 @@ public class SegmentListUsedAction implements TaskAction<List<DataSegment>>
private final String dataSource;
@JsonIgnore
private final Interval interval;
private final List<Interval> intervals;
@JsonCreator
public SegmentListUsedAction(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval
@Deprecated @JsonProperty("interval") Interval interval,
@JsonProperty("intervals") List<Interval> intervals
)
{
this.dataSource = dataSource;
this.interval = interval;
Preconditions.checkArgument(
interval == null || intervals == null,
"please specify intervals only"
);
List<Interval> theIntervals = null;
if (interval != null) {
theIntervals = ImmutableList.of(interval);
} else if (intervals != null && intervals.size() > 0) {
theIntervals = JodaUtils.condenseIntervals(intervals);
}
this.intervals = Preconditions.checkNotNull(theIntervals, "no intervals found");
}
@JsonProperty
@ -53,9 +69,9 @@ public class SegmentListUsedAction implements TaskAction<List<DataSegment>>
}
@JsonProperty
public Interval getInterval()
public List<Interval> getIntervals()
{
return interval;
return intervals;
}
public TypeReference<List<DataSegment>> getReturnTypeReference()
@ -66,7 +82,7 @@ public class SegmentListUsedAction implements TaskAction<List<DataSegment>>
@Override
public List<DataSegment> perform(Task task, TaskActionToolbox toolbox) throws IOException
{
return toolbox.getIndexerMetadataStorageCoordinator().getUsedSegmentsForInterval(dataSource, interval);
return toolbox.getIndexerMetadataStorageCoordinator().getUsedSegmentsForIntervals(dataSource, intervals);
}
@Override
@ -75,12 +91,39 @@ public class SegmentListUsedAction implements TaskAction<List<DataSegment>>
return false;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SegmentListUsedAction that = (SegmentListUsedAction) o;
if (!dataSource.equals(that.dataSource)) {
return false;
}
return intervals.equals(that.intervals);
}
@Override
public int hashCode()
{
int result = dataSource.hashCode();
result = 31 * result + intervals.hashCode();
return result;
}
@Override
public String toString()
{
return "SegmentListUsedAction{" +
"dataSource='" + dataSource + '\'' +
", interval=" + interval +
", intervals=" + intervals +
'}';
}
}

View File

@ -199,7 +199,8 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
final List<DataSegment> segments = toolbox.getTaskActionClient().submit(
new SegmentListUsedAction(
getDataSource(),
getInterval()
getInterval(),
null
)
);
segmentsToUpdate = FunctionalIterable
@ -364,7 +365,7 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
log.info("Converting segment[%s]", segment);
final TaskActionClient actionClient = toolbox.getTaskActionClient();
final List<DataSegment> currentSegments = actionClient.submit(
new SegmentListUsedAction(segment.getDataSource(), segment.getInterval())
new SegmentListUsedAction(segment.getDataSource(), segment.getInterval(), null)
);
for (DataSegment currentSegment : currentSegments) {

View File

@ -204,7 +204,7 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask
final Set<String> current = ImmutableSet.copyOf(
Iterables.transform(
taskActionClient.submit(new SegmentListUsedAction(getDataSource(), getInterval())),
taskActionClient.submit(new SegmentListUsedAction(getDataSource(), getInterval(), null)),
toIdentifier
)
);

View File

@ -132,7 +132,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
try {
final List<DataSegment> usedSegments = toolbox
.getTaskActionClient()
.submit(new SegmentListUsedAction(dataSource, interval));
.submit(new SegmentListUsedAction(dataSource, interval, null));
final Map<DataSegment, File> segmentFileMap = toolbox.fetchSegments(usedSegments);
VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<>(
Ordering.<String>natural().nullsFirst()

View File

@ -43,12 +43,12 @@ public class OverlordActionBasedUsedSegmentLister implements UsedSegmentLister
}
@Override
public List<DataSegment> getUsedSegmentsForInterval(
String dataSource, Interval interval
public List<DataSegment> getUsedSegmentsForIntervals(
String dataSource, List<Interval> intervals
) throws IOException
{
return toolbox
.getTaskActionClient()
.submit(new SegmentListUsedAction(dataSource, interval));
.submit(new SegmentListUsedAction(dataSource, null, intervals));
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common.actions;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import io.druid.TestUtil;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
/**
*/
public class SegmentListUsedActionTest
{
private static final ObjectMapper MAPPER = TestUtil.MAPPER;
@Test
public void testSingleIntervalSerde() throws Exception
{
Interval interval = Interval.parse("2014/2015");
SegmentListUsedAction expected = new SegmentListUsedAction(
"dataSource",
interval,
null
);
SegmentListUsedAction actual = MAPPER.readValue(MAPPER.writeValueAsString(expected), SegmentListUsedAction.class);
Assert.assertEquals(ImmutableList.of(interval), actual.getIntervals());
Assert.assertEquals(expected, actual);
}
@Test
public void testMultiIntervalSerde() throws Exception
{
List<Interval> intervals = ImmutableList.of(Interval.parse("2014/2015"), Interval.parse("2016/2017"));
SegmentListUsedAction expected = new SegmentListUsedAction(
"dataSource",
null,
intervals
);
SegmentListUsedAction actual = MAPPER.readValue(MAPPER.writeValueAsString(expected), SegmentListUsedAction.class);
Assert.assertEquals(intervals, actual.getIntervals());
Assert.assertEquals(expected, actual);
}
@Test
public void testOldJsonDeserialization() throws Exception
{
String jsonStr = "{\"type\": \"segmentListUsed\", \"dataSource\": \"test\", \"interval\": \"2014/2015\"}";
SegmentListUsedAction actual = (SegmentListUsedAction) MAPPER.readValue(jsonStr, TaskAction.class);
Assert.assertEquals(new SegmentListUsedAction("test", Interval.parse("2014/2015"), null), actual);
}
}

View File

@ -161,6 +161,12 @@ public class IngestSegmentFirehoseFactoryTest
return ImmutableList.copyOf(segmentSet);
}
@Override
public List<DataSegment> getUsedSegmentsForIntervals(String dataSource, List<Interval> interval) throws IOException
{
return ImmutableList.copyOf(segmentSet);
}
@Override
public List<DataSegment> getUnusedSegmentsForInterval(String dataSource, Interval interval)
{

View File

@ -283,7 +283,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest
if (taskAction instanceof SegmentListUsedAction) {
// Expect the interval we asked for
final SegmentListUsedAction action = (SegmentListUsedAction) taskAction;
if (action.getInterval().equals(testCase.interval)) {
if (action.getIntervals().equals(ImmutableList.of(testCase.interval))) {
return (RetType) ImmutableList.copyOf(testCase.segments);
} else {
throw new IllegalArgumentException("WTF");

View File

@ -49,6 +49,14 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
return ImmutableList.of();
}
@Override
public List<DataSegment> getUsedSegmentsForIntervals(
String dataSource, List<Interval> intervals
) throws IOException
{
return ImmutableList.of();
}
@Override
public List<DataSegment> getUnusedSegmentsForInterval(String dataSource, Interval interval)
{

View File

@ -42,6 +42,19 @@ public interface IndexerMetadataStorageCoordinator
public List<DataSegment> getUsedSegmentsForInterval(final String dataSource, final Interval interval)
throws IOException;
/**
* Get all segments which may include any data in the interval and are flagged as used.
*
* @param dataSource The datasource to query
* @param intervals The intervals for which all applicable and used datasources are requested.
*
* @return The DataSegments which include data in the requested intervals. These segments may contain data outside the requested interval.
*
* @throws IOException
*/
public List<DataSegment> getUsedSegmentsForIntervals(final String dataSource, final List<Interval> intervals)
throws IOException;
/**
* Attempts to insert a set of segments to the metadata storage. Returns the set of segments actually added (segments
* with identifiers already in the metadata storage will not be added).

View File

@ -21,12 +21,14 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.IAE;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.logger.Logger;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@ -43,6 +45,7 @@ import org.joda.time.Interval;
import org.skife.jdbi.v2.FoldController;
import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.Query;
import org.skife.jdbi.v2.ResultIterator;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.TransactionCallback;
@ -53,7 +56,9 @@ import org.skife.jdbi.v2.util.StringMapper;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
@ -85,10 +90,19 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
connector.createSegmentTable();
}
@Override
public List<DataSegment> getUsedSegmentsForInterval(
final String dataSource,
final Interval interval
) throws IOException
{
return getUsedSegmentsForIntervals(dataSource, ImmutableList.of(interval));
}
@Override
public List<DataSegment> getUsedSegmentsForIntervals(
final String dataSource, final List<Interval> intervals
) throws IOException
{
return connector.retryWithHandle(
new HandleCallback<List<DataSegment>>()
@ -96,16 +110,28 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
@Override
public List<DataSegment> withHandle(Handle handle) throws Exception
{
final VersionedIntervalTimeline<String, DataSegment> timeline = getTimelineForIntervalWithHandle(
final VersionedIntervalTimeline<String, DataSegment> timeline = getTimelineForIntervalsWithHandle(
handle,
dataSource,
interval
intervals
);
return Lists.newArrayList(
Set<DataSegment> segments = Sets.newHashSet(
Iterables.concat(
Iterables.transform(
timeline.lookup(interval),
Iterables.concat(
Iterables.transform(
intervals,
new Function<Interval, Iterable<TimelineObjectHolder<String, DataSegment>>>()
{
@Override
public Iterable<TimelineObjectHolder<String, DataSegment>> apply(Interval interval)
{
return timeline.lookup(interval);
}
}
)
),
new Function<TimelineObjectHolder<String, DataSegment>, Iterable<DataSegment>>()
{
@Override
@ -118,6 +144,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
)
);
return new ArrayList<>(segments);
}
}
);
@ -158,29 +185,51 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
return identifiers;
}
private VersionedIntervalTimeline<String, DataSegment> getTimelineForIntervalWithHandle(
private VersionedIntervalTimeline<String, DataSegment> getTimelineForIntervalsWithHandle(
final Handle handle,
final String dataSource,
final Interval interval
final List<Interval> intervals
) throws IOException
{
if (intervals == null || intervals.isEmpty()) {
throw new IAE("null/empty intervals");
}
final StringBuilder sb = new StringBuilder();
sb.append("SELECT payload FROM %s WHERE used = true AND dataSource = ? AND (");
for (int i = 0; i < intervals.size(); i++) {
sb.append(
"(start <= ? AND \"end\" >= ?)"
);
if (i == intervals.size() - 1) {
sb.append(")");
} else {
sb.append(" OR ");
}
}
Query<Map<String, Object>> sql = handle.createQuery(
String.format(
sb.toString(),
dbTables.getSegmentsTable()
)
).bind(0, dataSource);
for (int i = 0; i < intervals.size(); i++) {
Interval interval = intervals.get(i);
sql = sql
.bind(2 * i + 1, interval.getEnd().toString())
.bind(2 * i + 2, interval.getStart().toString());
}
final ResultIterator<byte[]> dbSegments = sql
.map(ByteArrayMapper.FIRST)
.iterator();
final VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<>(
Ordering.natural()
);
final ResultIterator<byte[]> dbSegments =
handle.createQuery(
String.format(
"SELECT payload FROM %s WHERE used = true AND dataSource = :dataSource AND start <= :end and \"end\" >= :start AND used = true",
dbTables.getSegmentsTable()
)
)
.bind("dataSource", dataSource)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.map(ByteArrayMapper.FIRST)
.iterator();
while (dbSegments.hasNext()) {
final byte[] payload = dbSegments.next();
@ -301,10 +350,10 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
final SegmentIdentifier newIdentifier;
final List<TimelineObjectHolder<String, DataSegment>> existingChunks = getTimelineForIntervalWithHandle(
final List<TimelineObjectHolder<String, DataSegment>> existingChunks = getTimelineForIntervalsWithHandle(
handle,
dataSource,
interval
ImmutableList.of(interval)
).lookup(interval);
if (existingChunks.size() > 1) {

View File

@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableSet;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.LinearShardSpec;
import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
@ -65,6 +66,19 @@ public class IndexerSQLMetadataStorageCoordinatorTest
9,
100
);
private final DataSegment defaultSegment3 = new DataSegment(
"dataSource",
Interval.parse("2015-01-03T00Z/2015-01-04T00Z"),
"version",
ImmutableMap.<String, Object>of(),
ImmutableList.of("dim1"),
ImmutableList.of("m1"),
new NoneShardSpec(),
9,
100
);
private final Set<DataSegment> segments = ImmutableSet.of(defaultSegment, defaultSegment2);
IndexerSQLMetadataStorageCoordinator coordinator;
private TestDerbyConnector derbyConnector;
@ -136,6 +150,52 @@ public class IndexerSQLMetadataStorageCoordinatorTest
);
}
@Test
public void testMultiIntervalUsedList() throws IOException
{
coordinator.announceHistoricalSegments(segments);
coordinator.announceHistoricalSegments(ImmutableSet.of(defaultSegment3));
Assert.assertEquals(
segments,
ImmutableSet.copyOf(
coordinator.getUsedSegmentsForIntervals(
defaultSegment.getDataSource(),
ImmutableList.of(defaultSegment.getInterval())
)
)
);
Assert.assertEquals(
ImmutableSet.of(defaultSegment3),
ImmutableSet.copyOf(
coordinator.getUsedSegmentsForIntervals(
defaultSegment.getDataSource(),
ImmutableList.of(defaultSegment3.getInterval())
)
)
);
Assert.assertEquals(
ImmutableSet.of(defaultSegment, defaultSegment2, defaultSegment3),
ImmutableSet.copyOf(
coordinator.getUsedSegmentsForIntervals(
defaultSegment.getDataSource(),
ImmutableList.of(defaultSegment.getInterval(), defaultSegment3.getInterval())
)
)
);
//case to check no duplication if two intervals overlapped with the interval of same segment.
Assert.assertEquals(
ImmutableList.of(defaultSegment3),
coordinator.getUsedSegmentsForIntervals(
defaultSegment.getDataSource(),
ImmutableList.of(Interval.parse("2015-01-03T00Z/2015-01-03T05Z"), Interval.parse("2015-01-03T09Z/2015-01-04T00Z"))
)
);
}
@Test
public void testSimpleUnUsedList() throws IOException
{