From 21b0b8a07dcc327d34c3f527eee6c8bb890ed8f9 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Tue, 9 Feb 2016 21:37:14 -0600 Subject: [PATCH] new coordinator endpoint to get list of used segment given a dataSource and list of intervals --- docs/content/design/coordinator.md | 8 +++ .../content/ingestion/update-existing-data.md | 31 ++++++++++-- .../druid/server/http/MetadataResource.java | 50 +++++++++++++++++-- 3 files changed, 82 insertions(+), 7 deletions(-) diff --git a/docs/content/design/coordinator.md b/docs/content/design/coordinator.md index e959d73d33c..56b3e9cfd10 100644 --- a/docs/content/design/coordinator.md +++ b/docs/content/design/coordinator.md @@ -105,6 +105,14 @@ Returns a list of all segments for a datasource as stored in the metadata store. Returns a list of all segments for a datasource with the full segment metadata as stored in the metadata store. +* POST `/druid/coordinator/v1/metadata/datasources/{dataSourceName}/segments` + +Returns a list of all segments, overlapping with any of given intervals, for a datasource as stored in the metadata store. Request body is array of string intervals like [interval1, interval2,...] for example ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"] + +* POST `/druid/coordinator/v1/metadata/datasources/{dataSourceName}/segments?full` + +Returns a list of all segments, overlapping with any of given intervals, for a datasource with the full segment metadata as stored in the metadata store. Request body is array of string intervals like [interval1, interval2,...] for example ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"] + * `/druid/coordinator/v1/metadata/datasources/{dataSourceName}/segments/{segmentId}` Returns full segment metadata for a specific segment as stored in the metadata store. diff --git a/docs/content/ingestion/update-existing-data.md b/docs/content/ingestion/update-existing-data.md index b8b8e0e3d89..26bdc005aaf 100644 --- a/docs/content/ingestion/update-existing-data.md +++ b/docs/content/ingestion/update-existing-data.md @@ -51,7 +51,7 @@ Here is what goes inside `ingestionSpec`: |-----|----|-----------|--------| |dataSource|String|Druid dataSource name from which you are loading the data.|yes| |intervals|List|A list of strings representing ISO-8601 Intervals.|yes| -|segments|List|A list of segments from which to read the data. It is determined automatically if not specified and the list of segments might change on multiple invocations. But user might want to provide it manually to make the ingestion task idempotent.|no| +|segments|List|List of segments from which to read data from, by default it is obtained automatically. You can obtain list of segments to put here by making a POST query to coordinator at url /druid/coordinator/v1/metadata/datasources/segments?full with list of intervals specified in the request paylod e.g. ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"]. You may want to provide this list manually in order to ensure that segments read are exactly same as they were at the time of task submission, task would fail if the list provided by the user does not match with state of database when the task actually runs.|no| |granularity|String|Defines the granularity of the query while loading data. Default value is "none". See [Granularities](../querying/granularities.html).|no| |filter|JSON|See [Filters](../querying/filters.html)|no| |dimensions|Array of String|Name of dimension columns to load. By default, the list will be constructed from parseSpec. If parseSpec does not have an explicit list of dimensions then all the dimension columns present in stored data will be read.|no| @@ -76,8 +76,7 @@ For example #### `multi` -This is a composing inputSpec to combine other inputSpecs. This inputSpec is used for delta ingestion. -Please note that delta ingestion is not an idempotent operation. We may add change things in future to make it idempotent. +This is a composing inputSpec to combine other inputSpecs. This inputSpec is used for delta ingestion. Please note that you can have only one `dataSource` as child of `multi` inputSpec. |Field|Type|Description|Required| |-----|----|-----------|--------| @@ -95,7 +94,26 @@ For example: "type" : "dataSource", "ingestionSpec" : { "dataSource": "wikipedia", - "intervals": ["2014-10-20T00:00:00Z/P2W"] + "intervals": ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"], + "segments": [ + { + "dataSource": "test1", + "interval": "2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", + "version": "v2", + "loadSpec": { + "type": "local", + "path": "/tmp/index1.zip" + }, + "dimensions": "host", + "metrics": "visited_sum,unique_hosts", + "shardSpec": { + "type": "none" + }, + "binaryVersion": 9, + "size": 2, + "identifier": "test1_2000-01-01T00:00:00.000Z_3000-01-01T00:00:00.000Z_v2" + } + ] } }, { @@ -108,6 +126,11 @@ For example: } ``` +It is STRONGLY RECOMMENDED to provide list of segments in `dataSource` inputSpec explicitly so that your delta ingestion task is idempotent. You can obtain that list of segments by making following call to the coordinator. +POST `/druid/coordinator/v1/metadata/datasources/{dataSourceName}/segments?full` +Request Body: [interval1, interval2,...] for example ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"] + + ### Reindexing without Hadoop Batch Ingestion This section assumes the reader understands how to do batch ingestion without Hadoop using the [IndexTask](../ingestion/tasks.html#index-task), diff --git a/server/src/main/java/io/druid/server/http/MetadataResource.java b/server/src/main/java/io/druid/server/http/MetadataResource.java index 2f96a708029..294165402f3 100644 --- a/server/src/main/java/io/druid/server/http/MetadataResource.java +++ b/server/src/main/java/io/druid/server/http/MetadataResource.java @@ -24,16 +24,20 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.inject.Inject; import io.druid.client.DruidDataSource; +import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import io.druid.metadata.MetadataSegmentManager; import io.druid.timeline.DataSegment; +import org.joda.time.Interval; import javax.ws.rs.GET; +import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import java.io.IOException; import java.util.Collections; import java.util.List; @@ -43,13 +47,16 @@ import java.util.List; public class MetadataResource { private final MetadataSegmentManager metadataSegmentManager; + private final IndexerMetadataStorageCoordinator metadataStorageCoordinator; @Inject public MetadataResource( - MetadataSegmentManager metadataSegmentManager + MetadataSegmentManager metadataSegmentManager, + IndexerMetadataStorageCoordinator metadataStorageCoordinator ) { this.metadataSegmentManager = metadataSegmentManager; + this.metadataStorageCoordinator = metadataStorageCoordinator; } @GET @@ -123,10 +130,47 @@ public class MetadataResource return builder.entity( Iterables.transform( dataSource.getSegments(), - new Function() + new Function() { @Override - public Object apply(DataSegment segment) + public String apply(DataSegment segment) + { + return segment.getIdentifier(); + } + } + ) + ).build(); + } + + @POST + @Path("/datasources/{dataSourceName}/segments") + @Produces(MediaType.APPLICATION_JSON) + public Response getDatabaseSegmentDataSourceSegments( + @PathParam("dataSourceName") String dataSourceName, + @QueryParam("full") String full, + List intervals + ) + { + List segments = null; + try { + segments = metadataStorageCoordinator.getUsedSegmentsForIntervals(dataSourceName, intervals); + } + catch (IOException ex) { + return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(ex.getMessage()).build(); + } + + Response.ResponseBuilder builder = Response.status(Response.Status.OK); + if (full != null) { + return builder.entity(segments).build(); + } + + return builder.entity( + Iterables.transform( + segments, + new Function() + { + @Override + public String apply(DataSegment segment) { return segment.getIdentifier(); }