give user the option to specify the segments for dataSource inputSpec

This commit is contained in:
Himanshu Gupta 2015-12-16 21:31:07 -06:00
parent 8ee81947cd
commit 09ffcae4ae
7 changed files with 157 additions and 4 deletions

View File

@ -51,6 +51,7 @@ Here is what goes inside `ingestionSpec`:
|-----|----|-----------|--------| |-----|----|-----------|--------|
|dataSource|String|Druid dataSource name from which you are loading the data.|yes| |dataSource|String|Druid dataSource name from which you are loading the data.|yes|
|intervals|List|A list of strings representing ISO-8601 Intervals.|yes| |intervals|List|A list of strings representing ISO-8601 Intervals.|yes|
|segments|List|A list of segments from which to read the data. It is determined automatically if not specified and the list of segments might change on multiple invocations. But user might want to provide it manually to make the ingestion task idempotent.|no|
|granularity|String|Defines the granularity of the query while loading data. Default value is "none". See [Granularities](../querying/granularities.html).|no| |granularity|String|Defines the granularity of the query while loading data. Default value is "none". See [Granularities](../querying/granularities.html).|no|
|filter|JSON|See [Filters](../querying/filters.html)|no| |filter|JSON|See [Filters](../querying/filters.html)|no|
|dimensions|Array of String|Name of dimension columns to load. By default, the list will be constructed from parseSpec. If parseSpec does not have an explicit list of dimensions then all the dimension columns present in stored data will be read.|no| |dimensions|Array of String|Name of dimension columns to load. By default, the list will be constructed from parseSpec. If parseSpec does not have an explicit list of dimensions then all the dimension columns present in stored data will be read.|no|

View File

@ -37,8 +37,10 @@ import io.druid.timeline.partition.PartitionChunk;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.io.IOException; import java.io.IOException;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
/** /**
*/ */
@ -168,10 +170,32 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
ingestionSpecMap, ingestionSpecMap,
DatasourceIngestionSpec.class DatasourceIngestionSpec.class
); );
List<DataSegment> segmentsList = segmentLister.getUsedSegmentsForIntervals( List<DataSegment> segmentsList = segmentLister.getUsedSegmentsForIntervals(
ingestionSpecObj.getDataSource(), ingestionSpecObj.getDataSource(),
ingestionSpecObj.getIntervals() ingestionSpecObj.getIntervals()
); );
if (ingestionSpecObj.getSegments() != null) {
//ensure that user supplied segment list matches with the segmentsList obtained from db
//this safety check lets users do test-n-set kind of batch delta ingestion where the delta
//ingestion task would only run if current state of the system is same as when they submitted
//the task.
List<DataSegment> userSuppliedSegmentsList = ingestionSpecObj.getSegments();
if (segmentsList.size() == userSuppliedSegmentsList.size()) {
Set<DataSegment> segmentsSet = new HashSet<>(segmentsList);
for (DataSegment userSegment : userSuppliedSegmentsList) {
if (!segmentsSet.contains(userSegment)) {
throw new IOException("user supplied segments list did not match with segments list obtained from db");
}
}
} else {
throw new IOException("user supplied segments list did not match with segments list obtained from db");
}
}
VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<>(Ordering.natural()); VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<>(Ordering.natural());
for (DataSegment segment : segmentsList) { for (DataSegment segment : segmentsList) {
timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)); timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment));

View File

@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableList;
import io.druid.common.utils.JodaUtils; import io.druid.common.utils.JodaUtils;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.filter.DimFilter; import io.druid.query.filter.DimFilter;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.util.List; import java.util.List;
@ -34,6 +35,7 @@ public class DatasourceIngestionSpec
{ {
private final String dataSource; private final String dataSource;
private final List<Interval> intervals; private final List<Interval> intervals;
private final List<DataSegment> segments;
private final DimFilter filter; private final DimFilter filter;
private final QueryGranularity granularity; private final QueryGranularity granularity;
private final List<String> dimensions; private final List<String> dimensions;
@ -45,6 +47,7 @@ public class DatasourceIngestionSpec
@JsonProperty("dataSource") String dataSource, @JsonProperty("dataSource") String dataSource,
@Deprecated @JsonProperty("interval") Interval interval, @Deprecated @JsonProperty("interval") Interval interval,
@JsonProperty("intervals") List<Interval> intervals, @JsonProperty("intervals") List<Interval> intervals,
@JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("filter") DimFilter filter, @JsonProperty("filter") DimFilter filter,
@JsonProperty("granularity") QueryGranularity granularity, @JsonProperty("granularity") QueryGranularity granularity,
@JsonProperty("dimensions") List<String> dimensions, @JsonProperty("dimensions") List<String> dimensions,
@ -67,6 +70,11 @@ public class DatasourceIngestionSpec
} }
this.intervals = Preconditions.checkNotNull(theIntervals, "no intervals found"); this.intervals = Preconditions.checkNotNull(theIntervals, "no intervals found");
// note that it is important to have intervals even if user explicitly specifies the list of
// segments, because segment list's min/max boundaries might not align the intended interval
// to read in all cases.
this.segments = segments;
this.filter = filter; this.filter = filter;
this.granularity = granularity == null ? QueryGranularity.NONE : granularity; this.granularity = granularity == null ? QueryGranularity.NONE : granularity;
@ -88,6 +96,12 @@ public class DatasourceIngestionSpec
return intervals; return intervals;
} }
@JsonProperty
public List<DataSegment> getSegments()
{
return segments;
}
@JsonProperty @JsonProperty
public DimFilter getFilter() public DimFilter getFilter()
{ {
@ -124,6 +138,7 @@ public class DatasourceIngestionSpec
dataSource, dataSource,
null, null,
intervals, intervals,
segments,
filter, filter,
granularity, granularity,
dimensions, dimensions,
@ -138,6 +153,7 @@ public class DatasourceIngestionSpec
dataSource, dataSource,
null, null,
intervals, intervals,
segments,
filter, filter,
granularity, granularity,
dimensions, dimensions,
@ -152,6 +168,7 @@ public class DatasourceIngestionSpec
dataSource, dataSource,
null, null,
intervals, intervals,
segments,
filter, filter,
granularity, granularity,
dimensions, dimensions,
@ -166,6 +183,7 @@ public class DatasourceIngestionSpec
dataSource, dataSource,
null, null,
intervals, intervals,
segments,
filter, filter,
granularity, granularity,
dimensions, dimensions,
@ -195,6 +213,9 @@ public class DatasourceIngestionSpec
if (!intervals.equals(that.intervals)) { if (!intervals.equals(that.intervals)) {
return false; return false;
} }
if (segments != null ? !segments.equals(that.segments) : that.segments != null) {
return false;
}
if (filter != null ? !filter.equals(that.filter) : that.filter != null) { if (filter != null ? !filter.equals(that.filter) : that.filter != null) {
return false; return false;
} }
@ -213,6 +234,7 @@ public class DatasourceIngestionSpec
{ {
int result = dataSource.hashCode(); int result = dataSource.hashCode();
result = 31 * result + intervals.hashCode(); result = 31 * result + intervals.hashCode();
result = 31 * result + (segments != null ? segments.hashCode() : 0);
result = 31 * result + (filter != null ? filter.hashCode() : 0); result = 31 * result + (filter != null ? filter.hashCode() : 0);
result = 31 * result + granularity.hashCode(); result = 31 * result + granularity.hashCode();
result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0); result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0);
@ -227,6 +249,7 @@ public class DatasourceIngestionSpec
return "DatasourceIngestionSpec{" + return "DatasourceIngestionSpec{" +
"dataSource='" + dataSource + '\'' + "dataSource='" + dataSource + '\'' +
", intervals=" + intervals + ", intervals=" + intervals +
", segments=" + segments +
", filter=" + filter + ", filter=" + filter +
", granularity=" + granularity + ", granularity=" + granularity +
", dimensions=" + dimensions + ", dimensions=" + dimensions +

View File

@ -43,6 +43,7 @@ import org.joda.time.Interval;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.io.IOException;
import java.util.Map; import java.util.Map;
/** /**
@ -91,7 +92,7 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
PathSpec pathSpec = new DatasourcePathSpec( PathSpec pathSpec = new DatasourcePathSpec(
jsonMapper, jsonMapper,
null, null,
new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null, null, false), new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null, null, null, false),
null null
); );
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed( HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(
@ -104,6 +105,60 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
); );
} }
@Test
public void testupdateSegmentListIfDatasourcePathSpecWithMatchingUserSegments() throws Exception
{
PathSpec pathSpec = new DatasourcePathSpec(
jsonMapper,
null,
new DatasourceIngestionSpec(
testDatasource,
testDatasourceInterval,
null,
ImmutableList.<DataSegment>of(SEGMENT),
null,
null,
null,
null,
false
),
null
);
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(
pathSpec,
testDatasourceInterval
);
Assert.assertEquals(
ImmutableList.of(WindowedDataSegment.of(SEGMENT)),
((DatasourcePathSpec) config.getPathSpec()).getSegments()
);
}
@Test(expected = IOException.class)
public void testupdateSegmentListThrowsExceptionWithUserSegmentsMismatch() throws Exception
{
PathSpec pathSpec = new DatasourcePathSpec(
jsonMapper,
null,
new DatasourceIngestionSpec(
testDatasource,
testDatasourceInterval,
null,
ImmutableList.<DataSegment>of(SEGMENT.withVersion("v2")),
null,
null,
null,
null,
false
),
null
);
testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(
pathSpec,
testDatasourceInterval
);
}
@Test @Test
public void testupdateSegmentListIfDatasourcePathSpecIsUsedWithJustDatasourcePathSpecAndPartialInterval() public void testupdateSegmentListIfDatasourcePathSpecIsUsedWithJustDatasourcePathSpecAndPartialInterval()
throws Exception throws Exception
@ -111,7 +166,17 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
PathSpec pathSpec = new DatasourcePathSpec( PathSpec pathSpec = new DatasourcePathSpec(
jsonMapper, jsonMapper,
null, null,
new DatasourceIngestionSpec(testDatasource, testDatasourceIntervalPartial, null, null, null, null, null, false), new DatasourceIngestionSpec(
testDatasource,
testDatasourceIntervalPartial,
null,
null,
null,
null,
null,
null,
false
),
null null
); );
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed( HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(
@ -133,7 +198,17 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
new DatasourcePathSpec( new DatasourcePathSpec(
jsonMapper, jsonMapper,
null, null,
new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null, null, false), new DatasourceIngestionSpec(
testDatasource,
testDatasourceInterval,
null,
null,
null,
null,
null,
null,
false
),
null null
) )
) )

View File

@ -25,6 +25,7 @@ import com.google.common.collect.Lists;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.filter.SelectorDimFilter; import io.druid.query.filter.SelectorDimFilter;
import io.druid.segment.TestHelper; import io.druid.segment.TestHelper;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -46,6 +47,7 @@ public class DatasourceIngestionSpecTest
"test", "test",
interval, interval,
null, null,
null,
new SelectorDimFilter("dim", "value"), new SelectorDimFilter("dim", "value"),
QueryGranularity.DAY, QueryGranularity.DAY,
Lists.newArrayList("d1", "d2"), Lists.newArrayList("d1", "d2"),
@ -84,6 +86,7 @@ public class DatasourceIngestionSpecTest
null, null,
null, null,
null, null,
null,
false false
); );
@ -93,6 +96,18 @@ public class DatasourceIngestionSpecTest
jsonStr = "{\n" jsonStr = "{\n"
+ " \"dataSource\": \"test\",\n" + " \"dataSource\": \"test\",\n"
+ " \"intervals\": [\"2014/2015\", \"2016/2017\"],\n" + " \"intervals\": [\"2014/2015\", \"2016/2017\"],\n"
+ " \"segments\": [{\n"
+ " \"dataSource\":\"test\",\n"
+ " \"interval\":\"2014-01-01T00:00:00.000Z/2017-01-01T00:00:00.000Z\",\n"
+ " \"version\":\"v0\",\n"
+ " \"loadSpec\":null,\n"
+ " \"dimensions\":\"\",\n"
+ " \"metrics\":\"\",\n"
+ " \"shardSpec\":{\"type\":\"none\"},\n"
+ " \"binaryVersion\":9,\n"
+ " \"size\":128,\n"
+ " \"identifier\":\"test_2014-01-01T00:00:00.000Z_2017-01-01T00:00:00.000Z_v0\"\n"
+ " }],\n"
+ " \"filter\": { \"type\": \"selector\", \"dimension\": \"dim\", \"value\": \"value\"},\n" + " \"filter\": { \"type\": \"selector\", \"dimension\": \"dim\", \"value\": \"value\"},\n"
+ " \"granularity\": \"day\",\n" + " \"granularity\": \"day\",\n"
+ " \"dimensions\": [\"d1\", \"d2\"],\n" + " \"dimensions\": [\"d1\", \"d2\"],\n"
@ -104,6 +119,19 @@ public class DatasourceIngestionSpecTest
"test", "test",
null, null,
intervals, intervals,
ImmutableList.of(
new DataSegment(
"test",
Interval.parse("2014/2017"),
"v0",
null,
null,
null,
null,
9,
128
)
),
new SelectorDimFilter("dim", "value"), new SelectorDimFilter("dim", "value"),
QueryGranularity.DAY, QueryGranularity.DAY,
Lists.newArrayList("d1", "d2"), Lists.newArrayList("d1", "d2"),
@ -128,7 +156,7 @@ public class DatasourceIngestionSpecTest
DatasourceIngestionSpec actual = MAPPER.readValue(jsonStr, DatasourceIngestionSpec.class); DatasourceIngestionSpec actual = MAPPER.readValue(jsonStr, DatasourceIngestionSpec.class);
Assert.assertEquals( Assert.assertEquals(
new DatasourceIngestionSpec("test", Interval.parse("2014/2015"), null, null, null, null, null, false), new DatasourceIngestionSpec("test", Interval.parse("2014/2015"), null, null, null, null, null, null, false),
actual actual
); );
} }

View File

@ -67,6 +67,7 @@ public class DatasourceRecordReaderTest
null, null,
null, null,
null, null,
null,
segment.getDimensions(), segment.getDimensions(),
segment.getMetrics(), segment.getMetrics(),
false false

View File

@ -80,6 +80,7 @@ public class DatasourcePathSpecTest
null, null,
null, null,
null, null,
null,
false false
); );