Merge pull request #2109 from himanshug/segments_in_delta_ingestion

idempotent batch delta ingestion
This commit is contained in:
Fangjin Yang 2016-02-22 14:00:45 -08:00
commit 0c984f9e32
9 changed files with 238 additions and 10 deletions

View File

@ -105,6 +105,14 @@ Returns a list of all segments for a datasource as stored in the metadata store.
Returns a list of all segments for a datasource with the full segment metadata as stored in the metadata store. Returns a list of all segments for a datasource with the full segment metadata as stored in the metadata store.
* POST `/druid/coordinator/v1/metadata/datasources/{dataSourceName}/segments`
Returns a list of all segments, overlapping with any of given intervals, for a datasource as stored in the metadata store. Request body is array of string intervals like [interval1, interval2,...] for example ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"]
* POST `/druid/coordinator/v1/metadata/datasources/{dataSourceName}/segments?full`
Returns a list of all segments, overlapping with any of given intervals, for a datasource with the full segment metadata as stored in the metadata store. Request body is array of string intervals like [interval1, interval2,...] for example ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"]
* `/druid/coordinator/v1/metadata/datasources/{dataSourceName}/segments/{segmentId}` * `/druid/coordinator/v1/metadata/datasources/{dataSourceName}/segments/{segmentId}`
Returns full segment metadata for a specific segment as stored in the metadata store. Returns full segment metadata for a specific segment as stored in the metadata store.

View File

@ -51,6 +51,7 @@ Here is what goes inside `ingestionSpec`:
|-----|----|-----------|--------| |-----|----|-----------|--------|
|dataSource|String|Druid dataSource name from which you are loading the data.|yes| |dataSource|String|Druid dataSource name from which you are loading the data.|yes|
|intervals|List|A list of strings representing ISO-8601 Intervals.|yes| |intervals|List|A list of strings representing ISO-8601 Intervals.|yes|
|segments|List|List of segments from which to read data from, by default it is obtained automatically. You can obtain list of segments to put here by making a POST query to coordinator at url /druid/coordinator/v1/metadata/datasources/segments?full with list of intervals specified in the request paylod e.g. ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"]. You may want to provide this list manually in order to ensure that segments read are exactly same as they were at the time of task submission, task would fail if the list provided by the user does not match with state of database when the task actually runs.|no|
|granularity|String|Defines the granularity of the query while loading data. Default value is "none". See [Granularities](../querying/granularities.html).|no| |granularity|String|Defines the granularity of the query while loading data. Default value is "none". See [Granularities](../querying/granularities.html).|no|
|filter|JSON|See [Filters](../querying/filters.html)|no| |filter|JSON|See [Filters](../querying/filters.html)|no|
|dimensions|Array of String|Name of dimension columns to load. By default, the list will be constructed from parseSpec. If parseSpec does not have an explicit list of dimensions then all the dimension columns present in stored data will be read.|no| |dimensions|Array of String|Name of dimension columns to load. By default, the list will be constructed from parseSpec. If parseSpec does not have an explicit list of dimensions then all the dimension columns present in stored data will be read.|no|
@ -75,8 +76,7 @@ For example
#### `multi` #### `multi`
This is a composing inputSpec to combine other inputSpecs. This inputSpec is used for delta ingestion. This is a composing inputSpec to combine other inputSpecs. This inputSpec is used for delta ingestion. Please note that you can have only one `dataSource` as child of `multi` inputSpec.
Please note that delta ingestion is not an idempotent operation. We may add change things in future to make it idempotent.
|Field|Type|Description|Required| |Field|Type|Description|Required|
|-----|----|-----------|--------| |-----|----|-----------|--------|
@ -94,7 +94,26 @@ For example:
"type" : "dataSource", "type" : "dataSource",
"ingestionSpec" : { "ingestionSpec" : {
"dataSource": "wikipedia", "dataSource": "wikipedia",
"intervals": ["2014-10-20T00:00:00Z/P2W"] "intervals": ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"],
"segments": [
{
"dataSource": "test1",
"interval": "2012-01-01T00:00:00.000/2012-01-03T00:00:00.000",
"version": "v2",
"loadSpec": {
"type": "local",
"path": "/tmp/index1.zip"
},
"dimensions": "host",
"metrics": "visited_sum,unique_hosts",
"shardSpec": {
"type": "none"
},
"binaryVersion": 9,
"size": 2,
"identifier": "test1_2000-01-01T00:00:00.000Z_3000-01-01T00:00:00.000Z_v2"
}
]
} }
}, },
{ {
@ -107,6 +126,11 @@ For example:
} }
``` ```
It is STRONGLY RECOMMENDED to provide list of segments in `dataSource` inputSpec explicitly so that your delta ingestion task is idempotent. You can obtain that list of segments by making following call to the coordinator.
POST `/druid/coordinator/v1/metadata/datasources/{dataSourceName}/segments?full`
Request Body: [interval1, interval2,...] for example ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"]
### Reindexing without Hadoop Batch Ingestion ### Reindexing without Hadoop Batch Ingestion
This section assumes the reader understands how to do batch ingestion without Hadoop using the [IndexTask](../ingestion/tasks.html#index-task), This section assumes the reader understands how to do batch ingestion without Hadoop using the [IndexTask](../ingestion/tasks.html#index-task),

View File

@ -37,8 +37,10 @@ import io.druid.timeline.partition.PartitionChunk;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.io.IOException; import java.io.IOException;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
/** /**
*/ */
@ -168,10 +170,32 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
ingestionSpecMap, ingestionSpecMap,
DatasourceIngestionSpec.class DatasourceIngestionSpec.class
); );
List<DataSegment> segmentsList = segmentLister.getUsedSegmentsForIntervals( List<DataSegment> segmentsList = segmentLister.getUsedSegmentsForIntervals(
ingestionSpecObj.getDataSource(), ingestionSpecObj.getDataSource(),
ingestionSpecObj.getIntervals() ingestionSpecObj.getIntervals()
); );
if (ingestionSpecObj.getSegments() != null) {
//ensure that user supplied segment list matches with the segmentsList obtained from db
//this safety check lets users do test-n-set kind of batch delta ingestion where the delta
//ingestion task would only run if current state of the system is same as when they submitted
//the task.
List<DataSegment> userSuppliedSegmentsList = ingestionSpecObj.getSegments();
if (segmentsList.size() == userSuppliedSegmentsList.size()) {
Set<DataSegment> segmentsSet = new HashSet<>(segmentsList);
for (DataSegment userSegment : userSuppliedSegmentsList) {
if (!segmentsSet.contains(userSegment)) {
throw new IOException("user supplied segments list did not match with segments list obtained from db");
}
}
} else {
throw new IOException("user supplied segments list did not match with segments list obtained from db");
}
}
VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<>(Ordering.natural()); VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<>(Ordering.natural());
for (DataSegment segment : segmentsList) { for (DataSegment segment : segmentsList) {
timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)); timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment));

View File

@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableList;
import io.druid.common.utils.JodaUtils; import io.druid.common.utils.JodaUtils;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.filter.DimFilter; import io.druid.query.filter.DimFilter;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.util.List; import java.util.List;
@ -34,6 +35,7 @@ public class DatasourceIngestionSpec
{ {
private final String dataSource; private final String dataSource;
private final List<Interval> intervals; private final List<Interval> intervals;
private final List<DataSegment> segments;
private final DimFilter filter; private final DimFilter filter;
private final QueryGranularity granularity; private final QueryGranularity granularity;
private final List<String> dimensions; private final List<String> dimensions;
@ -45,6 +47,7 @@ public class DatasourceIngestionSpec
@JsonProperty("dataSource") String dataSource, @JsonProperty("dataSource") String dataSource,
@Deprecated @JsonProperty("interval") Interval interval, @Deprecated @JsonProperty("interval") Interval interval,
@JsonProperty("intervals") List<Interval> intervals, @JsonProperty("intervals") List<Interval> intervals,
@JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("filter") DimFilter filter, @JsonProperty("filter") DimFilter filter,
@JsonProperty("granularity") QueryGranularity granularity, @JsonProperty("granularity") QueryGranularity granularity,
@JsonProperty("dimensions") List<String> dimensions, @JsonProperty("dimensions") List<String> dimensions,
@ -67,6 +70,11 @@ public class DatasourceIngestionSpec
} }
this.intervals = Preconditions.checkNotNull(theIntervals, "no intervals found"); this.intervals = Preconditions.checkNotNull(theIntervals, "no intervals found");
// note that it is important to have intervals even if user explicitly specifies the list of
// segments, because segment list's min/max boundaries might not align the intended interval
// to read in all cases.
this.segments = segments;
this.filter = filter; this.filter = filter;
this.granularity = granularity == null ? QueryGranularity.NONE : granularity; this.granularity = granularity == null ? QueryGranularity.NONE : granularity;
@ -88,6 +96,12 @@ public class DatasourceIngestionSpec
return intervals; return intervals;
} }
@JsonProperty
public List<DataSegment> getSegments()
{
return segments;
}
@JsonProperty @JsonProperty
public DimFilter getFilter() public DimFilter getFilter()
{ {
@ -124,6 +138,7 @@ public class DatasourceIngestionSpec
dataSource, dataSource,
null, null,
intervals, intervals,
segments,
filter, filter,
granularity, granularity,
dimensions, dimensions,
@ -138,6 +153,7 @@ public class DatasourceIngestionSpec
dataSource, dataSource,
null, null,
intervals, intervals,
segments,
filter, filter,
granularity, granularity,
dimensions, dimensions,
@ -152,6 +168,7 @@ public class DatasourceIngestionSpec
dataSource, dataSource,
null, null,
intervals, intervals,
segments,
filter, filter,
granularity, granularity,
dimensions, dimensions,
@ -166,6 +183,7 @@ public class DatasourceIngestionSpec
dataSource, dataSource,
null, null,
intervals, intervals,
segments,
filter, filter,
granularity, granularity,
dimensions, dimensions,
@ -195,6 +213,9 @@ public class DatasourceIngestionSpec
if (!intervals.equals(that.intervals)) { if (!intervals.equals(that.intervals)) {
return false; return false;
} }
if (segments != null ? !segments.equals(that.segments) : that.segments != null) {
return false;
}
if (filter != null ? !filter.equals(that.filter) : that.filter != null) { if (filter != null ? !filter.equals(that.filter) : that.filter != null) {
return false; return false;
} }
@ -213,6 +234,7 @@ public class DatasourceIngestionSpec
{ {
int result = dataSource.hashCode(); int result = dataSource.hashCode();
result = 31 * result + intervals.hashCode(); result = 31 * result + intervals.hashCode();
result = 31 * result + (segments != null ? segments.hashCode() : 0);
result = 31 * result + (filter != null ? filter.hashCode() : 0); result = 31 * result + (filter != null ? filter.hashCode() : 0);
result = 31 * result + granularity.hashCode(); result = 31 * result + granularity.hashCode();
result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0); result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0);
@ -227,6 +249,7 @@ public class DatasourceIngestionSpec
return "DatasourceIngestionSpec{" + return "DatasourceIngestionSpec{" +
"dataSource='" + dataSource + '\'' + "dataSource='" + dataSource + '\'' +
", intervals=" + intervals + ", intervals=" + intervals +
", segments=" + segments +
", filter=" + filter + ", filter=" + filter +
", granularity=" + granularity + ", granularity=" + granularity +
", dimensions=" + dimensions + ", dimensions=" + dimensions +

View File

@ -43,6 +43,7 @@ import org.joda.time.Interval;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.io.IOException;
import java.util.Map; import java.util.Map;
/** /**
@ -91,7 +92,7 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
PathSpec pathSpec = new DatasourcePathSpec( PathSpec pathSpec = new DatasourcePathSpec(
jsonMapper, jsonMapper,
null, null,
new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null, null, false), new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null, null, null, false),
null null
); );
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed( HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(
@ -104,6 +105,60 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
); );
} }
@Test
public void testupdateSegmentListIfDatasourcePathSpecWithMatchingUserSegments() throws Exception
{
PathSpec pathSpec = new DatasourcePathSpec(
jsonMapper,
null,
new DatasourceIngestionSpec(
testDatasource,
testDatasourceInterval,
null,
ImmutableList.<DataSegment>of(SEGMENT),
null,
null,
null,
null,
false
),
null
);
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(
pathSpec,
testDatasourceInterval
);
Assert.assertEquals(
ImmutableList.of(WindowedDataSegment.of(SEGMENT)),
((DatasourcePathSpec) config.getPathSpec()).getSegments()
);
}
@Test(expected = IOException.class)
public void testupdateSegmentListThrowsExceptionWithUserSegmentsMismatch() throws Exception
{
PathSpec pathSpec = new DatasourcePathSpec(
jsonMapper,
null,
new DatasourceIngestionSpec(
testDatasource,
testDatasourceInterval,
null,
ImmutableList.<DataSegment>of(SEGMENT.withVersion("v2")),
null,
null,
null,
null,
false
),
null
);
testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(
pathSpec,
testDatasourceInterval
);
}
@Test @Test
public void testupdateSegmentListIfDatasourcePathSpecIsUsedWithJustDatasourcePathSpecAndPartialInterval() public void testupdateSegmentListIfDatasourcePathSpecIsUsedWithJustDatasourcePathSpecAndPartialInterval()
throws Exception throws Exception
@ -111,7 +166,17 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
PathSpec pathSpec = new DatasourcePathSpec( PathSpec pathSpec = new DatasourcePathSpec(
jsonMapper, jsonMapper,
null, null,
new DatasourceIngestionSpec(testDatasource, testDatasourceIntervalPartial, null, null, null, null, null, false), new DatasourceIngestionSpec(
testDatasource,
testDatasourceIntervalPartial,
null,
null,
null,
null,
null,
null,
false
),
null null
); );
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed( HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(
@ -133,7 +198,17 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
new DatasourcePathSpec( new DatasourcePathSpec(
jsonMapper, jsonMapper,
null, null,
new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null, null, false), new DatasourceIngestionSpec(
testDatasource,
testDatasourceInterval,
null,
null,
null,
null,
null,
null,
false
),
null null
) )
) )

View File

@ -25,6 +25,7 @@ import com.google.common.collect.Lists;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.filter.SelectorDimFilter; import io.druid.query.filter.SelectorDimFilter;
import io.druid.segment.TestHelper; import io.druid.segment.TestHelper;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -46,6 +47,7 @@ public class DatasourceIngestionSpecTest
"test", "test",
interval, interval,
null, null,
null,
new SelectorDimFilter("dim", "value"), new SelectorDimFilter("dim", "value"),
QueryGranularity.DAY, QueryGranularity.DAY,
Lists.newArrayList("d1", "d2"), Lists.newArrayList("d1", "d2"),
@ -84,6 +86,7 @@ public class DatasourceIngestionSpecTest
null, null,
null, null,
null, null,
null,
false false
); );
@ -93,6 +96,18 @@ public class DatasourceIngestionSpecTest
jsonStr = "{\n" jsonStr = "{\n"
+ " \"dataSource\": \"test\",\n" + " \"dataSource\": \"test\",\n"
+ " \"intervals\": [\"2014/2015\", \"2016/2017\"],\n" + " \"intervals\": [\"2014/2015\", \"2016/2017\"],\n"
+ " \"segments\": [{\n"
+ " \"dataSource\":\"test\",\n"
+ " \"interval\":\"2014-01-01T00:00:00.000Z/2017-01-01T00:00:00.000Z\",\n"
+ " \"version\":\"v0\",\n"
+ " \"loadSpec\":null,\n"
+ " \"dimensions\":\"\",\n"
+ " \"metrics\":\"\",\n"
+ " \"shardSpec\":{\"type\":\"none\"},\n"
+ " \"binaryVersion\":9,\n"
+ " \"size\":128,\n"
+ " \"identifier\":\"test_2014-01-01T00:00:00.000Z_2017-01-01T00:00:00.000Z_v0\"\n"
+ " }],\n"
+ " \"filter\": { \"type\": \"selector\", \"dimension\": \"dim\", \"value\": \"value\"},\n" + " \"filter\": { \"type\": \"selector\", \"dimension\": \"dim\", \"value\": \"value\"},\n"
+ " \"granularity\": \"day\",\n" + " \"granularity\": \"day\",\n"
+ " \"dimensions\": [\"d1\", \"d2\"],\n" + " \"dimensions\": [\"d1\", \"d2\"],\n"
@ -104,6 +119,19 @@ public class DatasourceIngestionSpecTest
"test", "test",
null, null,
intervals, intervals,
ImmutableList.of(
new DataSegment(
"test",
Interval.parse("2014/2017"),
"v0",
null,
null,
null,
null,
9,
128
)
),
new SelectorDimFilter("dim", "value"), new SelectorDimFilter("dim", "value"),
QueryGranularity.DAY, QueryGranularity.DAY,
Lists.newArrayList("d1", "d2"), Lists.newArrayList("d1", "d2"),
@ -128,7 +156,7 @@ public class DatasourceIngestionSpecTest
DatasourceIngestionSpec actual = MAPPER.readValue(jsonStr, DatasourceIngestionSpec.class); DatasourceIngestionSpec actual = MAPPER.readValue(jsonStr, DatasourceIngestionSpec.class);
Assert.assertEquals( Assert.assertEquals(
new DatasourceIngestionSpec("test", Interval.parse("2014/2015"), null, null, null, null, null, false), new DatasourceIngestionSpec("test", Interval.parse("2014/2015"), null, null, null, null, null, null, false),
actual actual
); );
} }

View File

@ -67,6 +67,7 @@ public class DatasourceRecordReaderTest
null, null,
null, null,
null, null,
null,
segment.getDimensions(), segment.getDimensions(),
segment.getMetrics(), segment.getMetrics(),
false false

View File

@ -80,6 +80,7 @@ public class DatasourcePathSpecTest
null, null,
null, null,
null, null,
null,
false false
); );

View File

@ -24,16 +24,20 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.inject.Inject; import com.google.inject.Inject;
import io.druid.client.DruidDataSource; import io.druid.client.DruidDataSource;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.metadata.MetadataSegmentManager; import io.druid.metadata.MetadataSegmentManager;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
import javax.ws.rs.GET; import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path; import javax.ws.rs.Path;
import javax.ws.rs.PathParam; import javax.ws.rs.PathParam;
import javax.ws.rs.Produces; import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam; import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -43,13 +47,16 @@ import java.util.List;
public class MetadataResource public class MetadataResource
{ {
private final MetadataSegmentManager metadataSegmentManager; private final MetadataSegmentManager metadataSegmentManager;
private final IndexerMetadataStorageCoordinator metadataStorageCoordinator;
@Inject @Inject
public MetadataResource( public MetadataResource(
MetadataSegmentManager metadataSegmentManager MetadataSegmentManager metadataSegmentManager,
IndexerMetadataStorageCoordinator metadataStorageCoordinator
) )
{ {
this.metadataSegmentManager = metadataSegmentManager; this.metadataSegmentManager = metadataSegmentManager;
this.metadataStorageCoordinator = metadataStorageCoordinator;
} }
@GET @GET
@ -123,10 +130,47 @@ public class MetadataResource
return builder.entity( return builder.entity(
Iterables.transform( Iterables.transform(
dataSource.getSegments(), dataSource.getSegments(),
new Function<DataSegment, Object>() new Function<DataSegment, String>()
{ {
@Override @Override
public Object apply(DataSegment segment) public String apply(DataSegment segment)
{
return segment.getIdentifier();
}
}
)
).build();
}
@POST
@Path("/datasources/{dataSourceName}/segments")
@Produces(MediaType.APPLICATION_JSON)
public Response getDatabaseSegmentDataSourceSegments(
@PathParam("dataSourceName") String dataSourceName,
@QueryParam("full") String full,
List<Interval> intervals
)
{
List<DataSegment> segments = null;
try {
segments = metadataStorageCoordinator.getUsedSegmentsForIntervals(dataSourceName, intervals);
}
catch (IOException ex) {
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(ex.getMessage()).build();
}
Response.ResponseBuilder builder = Response.status(Response.Status.OK);
if (full != null) {
return builder.entity(segments).build();
}
return builder.entity(
Iterables.transform(
segments,
new Function<DataSegment, String>()
{
@Override
public String apply(DataSegment segment)
{ {
return segment.getIdentifier(); return segment.getIdentifier();
} }