From fe5010405322418355f841e8d9b090a519cbaca1 Mon Sep 17 00:00:00 2001 From: fjy Date: Fri, 10 Jan 2014 14:47:18 -0800 Subject: [PATCH] fix the index task and more docs --- docs/content/About-Experimental-Features.md | 4 + docs/content/Batch-ingestion.md | 8 +- docs/content/Booting-a-production-cluster.md | 4 +- docs/content/GeographicQueries.md | 59 +++++++ docs/content/GroupByQuery.md | 3 +- docs/content/Historical.md | 6 +- docs/content/OrderBy.md | 2 +- docs/content/Realtime.md | 2 +- docs/content/Spatial-Filters.md | 35 ---- docs/content/Spatial-Indexing.md | 26 --- docs/content/Tutorial:-The-Druid-Cluster.md | 10 +- docs/content/toc.textile | 4 + examples/config/historical/runtime.properties | 6 +- examples/config/realtime/runtime.properties | 2 +- .../druid/indexing/common/task/IndexTask.java | 166 ++++++++++++------ 15 files changed, 201 insertions(+), 136 deletions(-) create mode 100644 docs/content/About-Experimental-Features.md create mode 100644 docs/content/GeographicQueries.md delete mode 100644 docs/content/Spatial-Filters.md delete mode 100644 docs/content/Spatial-Indexing.md diff --git a/docs/content/About-Experimental-Features.md b/docs/content/About-Experimental-Features.md new file mode 100644 index 00000000000..aeaef1c5c7a --- /dev/null +++ b/docs/content/About-Experimental-Features.md @@ -0,0 +1,4 @@ +--- +layout: doc_page +--- +Experimental features are features we have developed but have not fully tested in a production environment. If you choose to try them out, there will likely to edge cases that we have not covered. We would love feedback on any of these features, whether they are bug reports, suggestions for improvements, or letting us know they work as intended. \ No newline at end of file diff --git a/docs/content/Batch-ingestion.md b/docs/content/Batch-ingestion.md index 9da893de9f6..4edab47866a 100644 --- a/docs/content/Batch-ingestion.md +++ b/docs/content/Batch-ingestion.md @@ -3,14 +3,14 @@ layout: doc_page --- # Batch Data Ingestion -There are two choices for batch data ingestion to your Druid cluster, you can use the [Indexing service](Indexing-service.html) or you can use the `HadoopDruidIndexer`. +There are two choices for batch data ingestion to your Druid cluster, you can use the [Indexing service](Indexing-Service.html) or you can use the `HadoopDruidIndexer`. Which should I use? ------------------- -The [Indexing service](Indexing-service.html) is a node that can run as part of your Druid cluster and can accomplish a number of different types of indexing tasks. Even if all you care about is batch indexing, it provides for the encapsulation of things like the [database](MySQL.html) that is used for segment metadata and other things, so that your indexing tasks do not need to include such information. The indexing service was created such that external systems could programmatically interact with it and run periodic indexing tasks. Long-term, the indexing service is going to be the preferred method of ingesting data. +The [Indexing service](Indexing-Service.html) is a node that can run as part of your Druid cluster and can accomplish a number of different types of indexing tasks. Even if all you care about is batch indexing, it provides for the encapsulation of things like the [database](MySQL.html) that is used for segment metadata and other things, so that your indexing tasks do not need to include such information. The indexing service was created such that external systems could programmatically interact with it and run periodic indexing tasks. Long-term, the indexing service is going to be the preferred method of ingesting data. -The `HadoopDruidIndexer` runs hadoop jobs in order to separate and index data segments. It takes advantage of Hadoop as a job scheduling and distributed job execution platform. It is a simple method if you already have Hadoop running and don’t want to spend the time configuring and deploying the [Indexing service](Indexing service.html) just yet. +The `HadoopDruidIndexer` runs hadoop jobs in order to separate and index data segments. It takes advantage of Hadoop as a job scheduling and distributed job execution platform. It is a simple method if you already have Hadoop running and don’t want to spend the time configuring and deploying the [Indexing service](Indexing-Service.html) just yet. Batch Ingestion using the HadoopDruidIndexer -------------------------------------------- @@ -231,7 +231,7 @@ The schema of the Hadoop Index Task contains a task "type" and a Hadoop Index Co |--------|-----------|---------| |type|This should be "index_hadoop".|yes| |config|A Hadoop Index Config (see above).|yes| -|hadoopCoordinates|The Maven :: of Hadoop to use. The default is "org.apache.hadoop:hadoop-core:1.0.3".|no| +|hadoopCoordinates|The Maven `::` of Hadoop to use. The default is "org.apache.hadoop:hadoop-core:1.0.3".|no| The Hadoop Index Config submitted as part of an Hadoop Index Task is identical to the Hadoop Index Config used by the `HadoopBatchIndexer` except that three fields must be omitted: `segmentOutputPath`, `workingPath`, `updaterJobSpec`. The Indexing Service takes care of setting these fields internally. diff --git a/docs/content/Booting-a-production-cluster.md b/docs/content/Booting-a-production-cluster.md index 3f57ce13d1c..295b6e88b25 100644 --- a/docs/content/Booting-a-production-cluster.md +++ b/docs/content/Booting-a-production-cluster.md @@ -1,7 +1,7 @@ --- layout: doc_page --- -# Booting a Single Node Cluster # +# Booting a Single Node Cluster [Loading Your Data](Tutorial%3A-Loading-Your-Data-Part-2.html) and [All About Queries](Tutorial%3A-All-About-Queries.html) contain recipes to boot a small druid cluster on localhost. Here we will boot a small cluster on EC2. You can checkout the code, or download a tarball from [here](http://static.druid.io/artifacts/druid-services-0.6.46-bin.tar.gz). @@ -17,7 +17,7 @@ export AWS_SECRET_KEY= Then, booting an ec2 instance running one node of each type is as simple as running the script, run_ec2.sh :) -# Apache Whirr # +# Apache Whirr Apache Whirr is a set of libraries for launching cloud services. You can clone a version of Whirr that includes Druid as a service from git@github.com:rjurney/whirr.git: diff --git a/docs/content/GeographicQueries.md b/docs/content/GeographicQueries.md new file mode 100644 index 00000000000..165f1d4fe40 --- /dev/null +++ b/docs/content/GeographicQueries.md @@ -0,0 +1,59 @@ +--- +layout: doc_page +--- +Druid supports filtering specially spatially indexed columns based on an origin and a bound. + +# Spatial Indexing +In any of the data specs, there is the option of providing spatial dimensions. For example, for a JSON data spec, spatial dimensions can be specified as follows: + +```json +"dataSpec" : { + "format": "JSON", + "dimensions": , + "spatialDimensions": [ + { + "dimName": "coordinates", + "dims": ["lat", "long"] + }, + ... + ] +} +``` + +|property|description|required?| +|--------|-----------|---------| +|dimName|The name of the spatial dimension. A spatial dimension may be constructed from multiple other dimensions or it may already exist as part of an event. If a spatial dimension already exists, it must be an array of coordinate values.|yes| +|dims|A list of dimension names that comprise a spatial dimension.|no| + +# Spatial Filters +The grammar for a spatial filter is as follows: + +```json +"filter" : { + "type": "spatial", + "dimension": "spatialDim", + "bound": { + "type": "rectangular", + "minCoords": [10.0, 20.0], + "maxCoords": [30.0, 40.0] + } +} +``` + +Bounds +------ + +### Rectangular + +|property|description|required?| +|--------|-----------|---------| +|minCoords|List of minimum dimension coordinates for coordinates [x, y, z, …]|yes| +|maxCoords|List of maximum dimension coordinates for coordinates [x, y, z, …]|yes| + +### Radius + +|property|description|required?| +|--------|-----------|---------| +|coords|Origin coordinates in the form [x, y, z, …]|yes| +|radius|The float radius value|yes| + diff --git a/docs/content/GroupByQuery.md b/docs/content/GroupByQuery.md index e208e588ac1..dd7f49f7179 100644 --- a/docs/content/GroupByQuery.md +++ b/docs/content/GroupByQuery.md @@ -1,8 +1,7 @@ --- layout: doc_page --- -These types of queries take a groupBy query object and return an array of JSON objects where each object represents a grouping asked for by the query. - +These types of queries take a groupBy query object and return an array of JSON objects where each object represents a grouping asked for by the query. Note: If you only want to do straight aggreagates for some time range, we highly recommend using [TimeseriesQueries](TimeseriesQuery.html) instead. The performance will be substantially better. An example groupBy query object is shown below: ``` json diff --git a/docs/content/Historical.md b/docs/content/Historical.md index b5fb1ebcfcf..361098be512 100644 --- a/docs/content/Historical.md +++ b/docs/content/Historical.md @@ -28,11 +28,13 @@ druid.port=8081 druid.zk.service.host=localhost -druid.server.maxSize=100000000 +druid.server.maxSize=10000000000 +# Change these to make Druid faster druid.processing.buffer.sizeBytes=10000000 +druid.processing.numThreads=1 -druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 100000000}]``` +druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 10000000000}] ``` Note: This will spin up a Historical node with the local filesystem as deep storage. diff --git a/docs/content/OrderBy.md b/docs/content/OrderBy.md index 1cbb88909b3..7b8f88edfb1 100644 --- a/docs/content/OrderBy.md +++ b/docs/content/OrderBy.md @@ -1,7 +1,7 @@ --- layout: doc_page --- -The orderBy field provides the functionality to sort and limit the set of results from a groupBy query. Available options are: +The orderBy field provides the functionality to sort and limit the set of results from a groupBy query. If you group by a single dimension and are ordering by a single metric, we highly recommend using [TopN Queries](TopNQuery.html) instead. The performance will be substantially better. Available options are: ### DefaultLimitSpec diff --git a/docs/content/Realtime.md b/docs/content/Realtime.md index 855a4196af2..f0d49dee466 100644 --- a/docs/content/Realtime.md +++ b/docs/content/Realtime.md @@ -42,7 +42,7 @@ druid.db.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid druid.db.connector.user=druid druid.db.connector.password=diurd -druid.processing.buffer.sizeBytes=10000000 +druid.processing.buffer.sizeBytes=268435456 ``` The realtime module also uses several of the default modules in [Configuration](Configuration.html). For more information on the realtime spec file (or configuration file), see [realtime ingestion](Realtime-ingestion.html) page. diff --git a/docs/content/Spatial-Filters.md b/docs/content/Spatial-Filters.md deleted file mode 100644 index eacf2379d9c..00000000000 --- a/docs/content/Spatial-Filters.md +++ /dev/null @@ -1,35 +0,0 @@ ---- -layout: doc_page ---- -Note: This feature is highly experimental and only works with spatially indexed dimensions. - -The grammar for a spatial filter is as follows: - - - { - "dimension": "spatialDim", - "bound": { - "type": "rectangular", - "minCoords": [10.0, 20.0], - "maxCoords": [30.0, 40.0] - } - } - - -Bounds ------- - -### Rectangular - -|property|description|required?| -|--------|-----------|---------| -|minCoords|List of minimum dimension coordinates for coordinates [x, y, z, …]|yes| -|maxCoords|List of maximum dimension coordinates for coordinates [x, y, z, …]|yes| - -### Radius - -|property|description|required?| -|--------|-----------|---------| -|coords|Origin coordinates in the form [x, y, z, …]|yes| -|radius|The float radius value|yes| - diff --git a/docs/content/Spatial-Indexing.md b/docs/content/Spatial-Indexing.md deleted file mode 100644 index 0c866bc6a50..00000000000 --- a/docs/content/Spatial-Indexing.md +++ /dev/null @@ -1,26 +0,0 @@ ---- -layout: doc_page ---- -Note: This feature is highly experimental. - -In any of the data specs, there is now the option of providing spatial dimensions. For example, for a JSON data spec, spatial dimensions can be specified as follows: - - - { - "type": "JSON", - "dimensions": , - "spatialDimensions": [ - { - "dimName": "coordinates", - "dims": ["lat", "long"] - }, - ... - ] - } - - -|property|description|required?| -|--------|-----------|---------| -|dimName|The name of the spatial dimension. A spatial dimension may be constructed from multiple other dimensions or it may already exist as part of an event. If a spatial dimension already exists, it must be an array of dimension values.|yes| -|dims|A list of dimension names that comprise a spatial dimension.|no| - diff --git a/docs/content/Tutorial:-The-Druid-Cluster.md b/docs/content/Tutorial:-The-Druid-Cluster.md index b2f5b6975ec..fbb4836d65f 100644 --- a/docs/content/Tutorial:-The-Druid-Cluster.md +++ b/docs/content/Tutorial:-The-Druid-Cluster.md @@ -155,11 +155,13 @@ druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.46"] druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b druid.s3.accessKey=AKIAIMKECRUYKDQGR6YQ -druid.server.maxSize=100000000 +druid.server.maxSize=10000000000 -druid.processing.buffer.sizeBytes=10000000 +# Change these to make Druid faster +druid.processing.buffer.sizeBytes=268435456 +druid.processing.numThreads=1 -druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 100000000}] +druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 10000000000}] ``` To start the historical node: @@ -248,7 +250,7 @@ druid.publish.type=noop # druid.db.connector.user=druid # druid.db.connector.password=diurd -druid.processing.buffer.sizeBytes=10000000 +druid.processing.buffer.sizeBytes=268435456 ``` Next Steps diff --git a/docs/content/toc.textile b/docs/content/toc.textile index 5dad9c14afc..373641625b2 100644 --- a/docs/content/toc.textile +++ b/docs/content/toc.textile @@ -64,6 +64,10 @@ h2. Architecture ** "MySQL":./MySQL.html ** "ZooKeeper":./ZooKeeper.html +h2. Experimental +* "About Experimental Features":./About-Experimental-Features.html +* "Geographic Queries":./GeographicQueries.html + h2. Development * "Versioning":./Versioning.html * "Build From Source":./Build-from-source.html diff --git a/examples/config/historical/runtime.properties b/examples/config/historical/runtime.properties index 8d9f1f35096..e460c539394 100644 --- a/examples/config/historical/runtime.properties +++ b/examples/config/historical/runtime.properties @@ -10,10 +10,10 @@ druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.46"] druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b druid.s3.accessKey=AKIAIMKECRUYKDQGR6YQ -druid.server.maxSize=100000000 +druid.server.maxSize=10000000000 # Change these to make Druid faster -druid.processing.buffer.sizeBytes=10000000 +druid.processing.buffer.sizeBytes=268435456 druid.processing.numThreads=1 -druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 100000000}] \ No newline at end of file +druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 10000000000}] \ No newline at end of file diff --git a/examples/config/realtime/runtime.properties b/examples/config/realtime/runtime.properties index 6ebf2e5dcdf..17c9db1fc23 100644 --- a/examples/config/realtime/runtime.properties +++ b/examples/config/realtime/runtime.properties @@ -14,4 +14,4 @@ druid.publish.type=noop # druid.db.connector.user=druid # druid.db.connector.password=diurd -druid.processing.buffer.sizeBytes=10000000 +druid.processing.buffer.sizeBytes=268435456 diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 25603c981cd..09038cd9af0 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -1,34 +1,35 @@ /* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ +* Druid - a distributed column store. +* Copyright (C) 2012, 2013 Metamarkets Group Inc. +* +* This program is free software; you can redistribute it and/or +* modify it under the terms of the GNU General Public License +* as published by the Free Software Foundation; either version 2 +* of the License, or (at your option) any later version. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with this program; if not, write to the Free Software +* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +*/ package io.druid.indexing.common.task; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.api.client.util.Sets; +import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; +import com.google.common.collect.Sets; import com.google.common.collect.TreeMultiset; import com.google.common.primitives.Ints; import com.metamx.common.logger.Logger; @@ -41,7 +42,6 @@ import io.druid.indexer.granularity.GranularitySpec; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; -import io.druid.indexing.common.actions.SegmentInsertAction; import io.druid.indexing.common.index.YeOldePlumberSchool; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.loading.DataSegmentPusher; @@ -63,6 +63,10 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +/** + * Simple single-threaded indexing task. Meant to be easy to use for small and medium sized datasets. For "big data", + * try launching multiple IndexTasks or using the {@link HadoopIndexTask}. + */ public class IndexTask extends AbstractFixedIntervalTask { private static final Logger log = new Logger(IndexTask.class); @@ -132,53 +136,67 @@ public class IndexTask extends AbstractFixedIntervalTask public TaskStatus run(TaskToolbox toolbox) throws Exception { final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox)); - final Set segments = Sets.newHashSet(); - for (final Interval bucket : granularitySpec.bucketIntervals()) { - final List shardSpecs; - if (targetPartitionSize > 0) { - shardSpecs = determinePartitions(bucket, targetPartitionSize); - } else { - shardSpecs = ImmutableList.of(new NoneShardSpec()); - } - for (final ShardSpec shardSpec : shardSpecs) { - final DataSegment segment = generateSegment( - toolbox, - new Schema( - getDataSource(), - spatialDimensions, - aggregators, - indexGranularity, - shardSpec - ), - bucket, - myLock.getVersion() - ); - segments.add(segment); - } - } + + final Map> shardSpecMap = determinePartitions(targetPartitionSize); + + final Map> schemass = Maps.transformEntries( + shardSpecMap, + new Maps.EntryTransformer, List>() + { + @Override + public List transformEntry( + Interval key, List shardSpecs + ) + { + return Lists.transform( + shardSpecs, + new Function() + { + @Override + public Schema apply(final ShardSpec shardSpec) + { + return new Schema(getDataSource(), spatialDimensions, aggregators, indexGranularity, shardSpec); + } + } + ); + } + } + ); + + final Set segments = generateSegments(toolbox, schemass, myLock.getVersion()); toolbox.pushSegments(segments); + return TaskStatus.success(getId()); } - private List determinePartitions( - final Interval interval, + private Map> determinePartitions( final int targetPartitionSize ) throws IOException { - log.info("Determining partitions for interval[%s] with targetPartitionSize[%d]", interval, targetPartitionSize); + Map> retVal = Maps.newLinkedHashMap(); - // The implementation of this determine partitions stuff is less than optimal. Should be done better. + log.info("Determining partitions with targetPartitionSize[%d]", targetPartitionSize); // Blacklist dimensions that have multiple values per row - final Set unusableDimensions = com.google.common.collect.Sets.newHashSet(); - // Track values of all non-blacklisted dimensions - final Map> dimensionValueMultisets = Maps.newHashMap(); + final Set unusableDimensions = Sets.newHashSet(); // Load data try (Firehose firehose = firehoseFactory.connect()) { - while (firehose.hasMore()) { - final InputRow inputRow = firehose.nextRow(); - if (interval.contains(inputRow.getTimestampFromEpoch())) { + if (!firehose.hasMore()) { + log.error("Unable to find any events to ingest! Check your firehose config!"); + return retVal; + } + InputRow inputRow = firehose.nextRow(); + + for (Interval interval : granularitySpec.bucketIntervals()) { + // Track values of all non-blacklisted dimensions + final Map> dimensionValueMultisets = Maps.newHashMap(); + + boolean hasEventsInInterval = false; + boolean done = false; + while (!done && interval.contains(inputRow.getTimestampFromEpoch())) { + hasEventsInInterval = true; + // Extract dimensions from event for (final String dim : inputRow.getDimensions()) { final List dimValues = inputRow.getDimension(dim); @@ -198,10 +216,31 @@ public class IndexTask extends AbstractFixedIntervalTask } } } + + if (firehose.hasMore()) { + inputRow = firehose.nextRow(); + } else { + done = true; + } + } + + if (hasEventsInInterval) { + if (targetPartitionSize == 0) { + retVal.put(interval, ImmutableList.of(new NoneShardSpec())); + } else { + retVal.put(interval, determineShardSpecs(dimensionValueMultisets)); + } } } } + return retVal; + } + + private List determineShardSpecs( + final Map> dimensionValueMultisets + ) + { // ShardSpecs we will return final List shardSpecs = Lists.newArrayList(); @@ -281,6 +320,23 @@ public class IndexTask extends AbstractFixedIntervalTask return shardSpecs; } + private Set generateSegments( + final TaskToolbox toolbox, + final Map> schemass, + final String version + ) throws IOException + { + final Set retVal = Sets.newHashSet(); + + for (Map.Entry> entry : schemass.entrySet()) { + for (Schema schema : entry.getValue()) { + retVal.add(generateSegment(toolbox, schema, entry.getKey(), version)); + } + } + + return retVal; + } + private DataSegment generateSegment( final TaskToolbox toolbox, final Schema schema, @@ -302,7 +358,7 @@ public class IndexTask extends AbstractFixedIntervalTask ); // We need to track published segments. - final List pushedSegments = new CopyOnWriteArrayList(); + final List pushedSegments = new CopyOnWriteArrayList<>(); final DataSegmentPusher wrappedDataSegmentPusher = new DataSegmentPusher() { @Override @@ -444,4 +500,4 @@ public class IndexTask extends AbstractFixedIntervalTask { return spatialDimensions; } -} +} \ No newline at end of file