mirror of https://github.com/apache/druid.git
Merge pull request #355 from metamx/fix_index_task
Fix the index task and more docs
This commit is contained in:
commit
db75b6d751
|
@ -0,0 +1,4 @@
|
|||
---
|
||||
layout: doc_page
|
||||
---
|
||||
Experimental features are features we have developed but have not fully tested in a production environment. If you choose to try them out, there will likely to edge cases that we have not covered. We would love feedback on any of these features, whether they are bug reports, suggestions for improvements, or letting us know they work as intended.
|
|
@ -3,14 +3,14 @@ layout: doc_page
|
|||
---
|
||||
|
||||
# Batch Data Ingestion
|
||||
There are two choices for batch data ingestion to your Druid cluster, you can use the [Indexing service](Indexing-service.html) or you can use the `HadoopDruidIndexer`.
|
||||
There are two choices for batch data ingestion to your Druid cluster, you can use the [Indexing service](Indexing-Service.html) or you can use the `HadoopDruidIndexer`.
|
||||
|
||||
Which should I use?
|
||||
-------------------
|
||||
|
||||
The [Indexing service](Indexing-service.html) is a node that can run as part of your Druid cluster and can accomplish a number of different types of indexing tasks. Even if all you care about is batch indexing, it provides for the encapsulation of things like the [database](MySQL.html) that is used for segment metadata and other things, so that your indexing tasks do not need to include such information. The indexing service was created such that external systems could programmatically interact with it and run periodic indexing tasks. Long-term, the indexing service is going to be the preferred method of ingesting data.
|
||||
The [Indexing service](Indexing-Service.html) is a node that can run as part of your Druid cluster and can accomplish a number of different types of indexing tasks. Even if all you care about is batch indexing, it provides for the encapsulation of things like the [database](MySQL.html) that is used for segment metadata and other things, so that your indexing tasks do not need to include such information. The indexing service was created such that external systems could programmatically interact with it and run periodic indexing tasks. Long-term, the indexing service is going to be the preferred method of ingesting data.
|
||||
|
||||
The `HadoopDruidIndexer` runs hadoop jobs in order to separate and index data segments. It takes advantage of Hadoop as a job scheduling and distributed job execution platform. It is a simple method if you already have Hadoop running and don’t want to spend the time configuring and deploying the [Indexing service](Indexing service.html) just yet.
|
||||
The `HadoopDruidIndexer` runs hadoop jobs in order to separate and index data segments. It takes advantage of Hadoop as a job scheduling and distributed job execution platform. It is a simple method if you already have Hadoop running and don’t want to spend the time configuring and deploying the [Indexing service](Indexing-Service.html) just yet.
|
||||
|
||||
Batch Ingestion using the HadoopDruidIndexer
|
||||
--------------------------------------------
|
||||
|
@ -231,7 +231,7 @@ The schema of the Hadoop Index Task contains a task "type" and a Hadoop Index Co
|
|||
|--------|-----------|---------|
|
||||
|type|This should be "index_hadoop".|yes|
|
||||
|config|A Hadoop Index Config (see above).|yes|
|
||||
|hadoopCoordinates|The Maven <groupId>:<artifactId>:<version> of Hadoop to use. The default is "org.apache.hadoop:hadoop-core:1.0.3".|no|
|
||||
|hadoopCoordinates|The Maven `<groupId>:<artifactId>:<version>` of Hadoop to use. The default is "org.apache.hadoop:hadoop-core:1.0.3".|no|
|
||||
|
||||
The Hadoop Index Config submitted as part of an Hadoop Index Task is identical to the Hadoop Index Config used by the `HadoopBatchIndexer` except that three fields must be omitted: `segmentOutputPath`, `workingPath`, `updaterJobSpec`. The Indexing Service takes care of setting these fields internally.
|
||||
|
||||
|
|
|
@ -66,10 +66,14 @@ You can then use the EC2 dashboard to locate the instance and confirm that it ha
|
|||
|
||||
If both the instance and the Druid cluster launch successfully, a few minutes later other messages to STDOUT should follow with information returned from EC2, including the instance ID:
|
||||
|
||||
<<<<<<< HEAD
|
||||
# Apache Whirr
|
||||
=======
|
||||
Started cluster of 1 instances
|
||||
Cluster{instances=[Instance{roles=[zookeeper, druid-mysql, druid-master, druid-broker, druid-compute, druid-realtime], publicIp= ...
|
||||
|
||||
The final message will contain login information for the instance.
|
||||
>>>>>>> master
|
||||
|
||||
Note that the Whirr will return an exception if any of the nodes fail to launch, and the cluster will be destroyed. To destroy the cluster manually, run the following command:
|
||||
|
||||
|
|
|
@ -0,0 +1,59 @@
|
|||
---
|
||||
layout: doc_page
|
||||
---
|
||||
Druid supports filtering specially spatially indexed columns based on an origin and a bound.
|
||||
|
||||
# Spatial Indexing
|
||||
In any of the data specs, there is the option of providing spatial dimensions. For example, for a JSON data spec, spatial dimensions can be specified as follows:
|
||||
|
||||
```json
|
||||
"dataSpec" : {
|
||||
"format": "JSON",
|
||||
"dimensions": <some_dims>,
|
||||
"spatialDimensions": [
|
||||
{
|
||||
"dimName": "coordinates",
|
||||
"dims": ["lat", "long"]
|
||||
},
|
||||
...
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
|property|description|required?|
|
||||
|--------|-----------|---------|
|
||||
|dimName|The name of the spatial dimension. A spatial dimension may be constructed from multiple other dimensions or it may already exist as part of an event. If a spatial dimension already exists, it must be an array of coordinate values.|yes|
|
||||
|dims|A list of dimension names that comprise a spatial dimension.|no|
|
||||
|
||||
# Spatial Filters
|
||||
The grammar for a spatial filter is as follows:
|
||||
|
||||
```json
|
||||
"filter" : {
|
||||
"type": "spatial",
|
||||
"dimension": "spatialDim",
|
||||
"bound": {
|
||||
"type": "rectangular",
|
||||
"minCoords": [10.0, 20.0],
|
||||
"maxCoords": [30.0, 40.0]
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Bounds
|
||||
------
|
||||
|
||||
### Rectangular
|
||||
|
||||
|property|description|required?|
|
||||
|--------|-----------|---------|
|
||||
|minCoords|List of minimum dimension coordinates for coordinates [x, y, z, …]|yes|
|
||||
|maxCoords|List of maximum dimension coordinates for coordinates [x, y, z, …]|yes|
|
||||
|
||||
### Radius
|
||||
|
||||
|property|description|required?|
|
||||
|--------|-----------|---------|
|
||||
|coords|Origin coordinates in the form [x, y, z, …]|yes|
|
||||
|radius|The float radius value|yes|
|
||||
|
|
@ -1,8 +1,7 @@
|
|||
---
|
||||
layout: doc_page
|
||||
---
|
||||
These types of queries take a groupBy query object and return an array of JSON objects where each object represents a grouping asked for by the query.
|
||||
|
||||
These types of queries take a groupBy query object and return an array of JSON objects where each object represents a grouping asked for by the query. Note: If you only want to do straight aggreagates for some time range, we highly recommend using [TimeseriesQueries](TimeseriesQuery.html) instead. The performance will be substantially better.
|
||||
An example groupBy query object is shown below:
|
||||
|
||||
``` json
|
||||
|
|
|
@ -28,11 +28,13 @@ druid.port=8081
|
|||
|
||||
druid.zk.service.host=localhost
|
||||
|
||||
druid.server.maxSize=100000000
|
||||
druid.server.maxSize=10000000000
|
||||
|
||||
# Change these to make Druid faster
|
||||
druid.processing.buffer.sizeBytes=10000000
|
||||
druid.processing.numThreads=1
|
||||
|
||||
druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 100000000}]```
|
||||
druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 10000000000}]
|
||||
```
|
||||
|
||||
Note: This will spin up a Historical node with the local filesystem as deep storage.
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
---
|
||||
layout: doc_page
|
||||
---
|
||||
The orderBy field provides the functionality to sort and limit the set of results from a groupBy query. Available options are:
|
||||
The orderBy field provides the functionality to sort and limit the set of results from a groupBy query. If you group by a single dimension and are ordering by a single metric, we highly recommend using [TopN Queries](TopNQuery.html) instead. The performance will be substantially better. Available options are:
|
||||
|
||||
### DefaultLimitSpec
|
||||
|
||||
|
|
|
@ -42,7 +42,7 @@ druid.db.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
|
|||
druid.db.connector.user=druid
|
||||
druid.db.connector.password=diurd
|
||||
|
||||
druid.processing.buffer.sizeBytes=10000000
|
||||
druid.processing.buffer.sizeBytes=268435456
|
||||
```
|
||||
|
||||
The realtime module also uses several of the default modules in [Configuration](Configuration.html). For more information on the realtime spec file (or configuration file), see [realtime ingestion](Realtime-ingestion.html) page.
|
||||
|
|
|
@ -1,35 +0,0 @@
|
|||
---
|
||||
layout: doc_page
|
||||
---
|
||||
Note: This feature is highly experimental and only works with spatially indexed dimensions.
|
||||
|
||||
The grammar for a spatial filter is as follows:
|
||||
|
||||
<code>
|
||||
{
|
||||
"dimension": "spatialDim",
|
||||
"bound": {
|
||||
"type": "rectangular",
|
||||
"minCoords": [10.0, 20.0],
|
||||
"maxCoords": [30.0, 40.0]
|
||||
}
|
||||
}
|
||||
</code>
|
||||
|
||||
Bounds
|
||||
------
|
||||
|
||||
### Rectangular
|
||||
|
||||
|property|description|required?|
|
||||
|--------|-----------|---------|
|
||||
|minCoords|List of minimum dimension coordinates for coordinates [x, y, z, …]|yes|
|
||||
|maxCoords|List of maximum dimension coordinates for coordinates [x, y, z, …]|yes|
|
||||
|
||||
### Radius
|
||||
|
||||
|property|description|required?|
|
||||
|--------|-----------|---------|
|
||||
|coords|Origin coordinates in the form [x, y, z, …]|yes|
|
||||
|radius|The float radius value|yes|
|
||||
|
|
@ -1,26 +0,0 @@
|
|||
---
|
||||
layout: doc_page
|
||||
---
|
||||
Note: This feature is highly experimental.
|
||||
|
||||
In any of the data specs, there is now the option of providing spatial dimensions. For example, for a JSON data spec, spatial dimensions can be specified as follows:
|
||||
|
||||
<code>
|
||||
{
|
||||
"type": "JSON",
|
||||
"dimensions": <some_dims>,
|
||||
"spatialDimensions": [
|
||||
{
|
||||
"dimName": "coordinates",
|
||||
"dims": ["lat", "long"]
|
||||
},
|
||||
...
|
||||
]
|
||||
}
|
||||
</code>
|
||||
|
||||
|property|description|required?|
|
||||
|--------|-----------|---------|
|
||||
|dimName|The name of the spatial dimension. A spatial dimension may be constructed from multiple other dimensions or it may already exist as part of an event. If a spatial dimension already exists, it must be an array of dimension values.|yes|
|
||||
|dims|A list of dimension names that comprise a spatial dimension.|no|
|
||||
|
|
@ -155,11 +155,13 @@ druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.46"]
|
|||
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
|
||||
druid.s3.accessKey=AKIAIMKECRUYKDQGR6YQ
|
||||
|
||||
druid.server.maxSize=100000000
|
||||
druid.server.maxSize=10000000000
|
||||
|
||||
druid.processing.buffer.sizeBytes=10000000
|
||||
# Change these to make Druid faster
|
||||
druid.processing.buffer.sizeBytes=268435456
|
||||
druid.processing.numThreads=1
|
||||
|
||||
druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 100000000}]
|
||||
druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 10000000000}]
|
||||
```
|
||||
|
||||
To start the historical node:
|
||||
|
@ -248,7 +250,7 @@ druid.publish.type=noop
|
|||
# druid.db.connector.user=druid
|
||||
# druid.db.connector.password=diurd
|
||||
|
||||
druid.processing.buffer.sizeBytes=10000000
|
||||
druid.processing.buffer.sizeBytes=268435456
|
||||
```
|
||||
|
||||
Next Steps
|
||||
|
|
|
@ -64,6 +64,10 @@ h2. Architecture
|
|||
** "MySQL":./MySQL.html
|
||||
** "ZooKeeper":./ZooKeeper.html
|
||||
|
||||
h2. Experimental
|
||||
* "About Experimental Features":./About-Experimental-Features.html
|
||||
* "Geographic Queries":./GeographicQueries.html
|
||||
|
||||
h2. Development
|
||||
* "Versioning":./Versioning.html
|
||||
* "Build From Source":./Build-from-source.html
|
||||
|
|
|
@ -10,10 +10,10 @@ druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.46"]
|
|||
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
|
||||
druid.s3.accessKey=AKIAIMKECRUYKDQGR6YQ
|
||||
|
||||
druid.server.maxSize=100000000
|
||||
druid.server.maxSize=10000000000
|
||||
|
||||
# Change these to make Druid faster
|
||||
druid.processing.buffer.sizeBytes=10000000
|
||||
druid.processing.buffer.sizeBytes=268435456
|
||||
druid.processing.numThreads=1
|
||||
|
||||
druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 100000000}]
|
||||
druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 10000000000}]
|
|
@ -14,4 +14,4 @@ druid.publish.type=noop
|
|||
# druid.db.connector.user=druid
|
||||
# druid.db.connector.password=diurd
|
||||
|
||||
druid.processing.buffer.sizeBytes=10000000
|
||||
druid.processing.buffer.sizeBytes=268435456
|
||||
|
|
|
@ -25,6 +25,7 @@ import com.google.common.base.Optional;
|
|||
import com.google.common.collect.Iterators;
|
||||
import com.google.common.collect.PeekingIterator;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.metamx.common.Granularity;
|
||||
import com.metamx.common.guava.Comparators;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
@ -45,22 +46,25 @@ public class ArbitraryGranularitySpec implements GranularitySpec
|
|||
intervals = Sets.newTreeSet(Comparators.intervalsByStartThenEnd());
|
||||
|
||||
// Insert all intervals
|
||||
for(final Interval inputInterval : inputIntervals) {
|
||||
for (final Interval inputInterval : inputIntervals) {
|
||||
intervals.add(inputInterval);
|
||||
}
|
||||
|
||||
// Ensure intervals are non-overlapping (but they may abut each other)
|
||||
final PeekingIterator<Interval> intervalIterator = Iterators.peekingIterator(intervals.iterator());
|
||||
while(intervalIterator.hasNext()) {
|
||||
while (intervalIterator.hasNext()) {
|
||||
final Interval currentInterval = intervalIterator.next();
|
||||
|
||||
if(intervalIterator.hasNext()) {
|
||||
if (intervalIterator.hasNext()) {
|
||||
final Interval nextInterval = intervalIterator.peek();
|
||||
if(currentInterval.overlaps(nextInterval)) {
|
||||
throw new IllegalArgumentException(String.format(
|
||||
"Overlapping intervals: %s, %s",
|
||||
currentInterval,
|
||||
nextInterval));
|
||||
if (currentInterval.overlaps(nextInterval)) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format(
|
||||
"Overlapping intervals: %s, %s",
|
||||
currentInterval,
|
||||
nextInterval
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -79,10 +83,16 @@ public class ArbitraryGranularitySpec implements GranularitySpec
|
|||
// First interval with start time ≤ dt
|
||||
final Interval interval = intervals.floor(new Interval(dt, new DateTime(Long.MAX_VALUE)));
|
||||
|
||||
if(interval != null && interval.contains(dt)) {
|
||||
if (interval != null && interval.contains(dt)) {
|
||||
return Optional.of(interval);
|
||||
} else {
|
||||
return Optional.absent();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Granularity getGranularity()
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ package io.druid.indexer.granularity;
|
|||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import com.google.common.base.Optional;
|
||||
import com.metamx.common.Granularity;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
|
@ -43,4 +44,6 @@ public interface GranularitySpec
|
|||
|
||||
/** Time-grouping interval corresponding to some instant, if any. */
|
||||
public Optional<Interval> bucketInterval(DateTime dt);
|
||||
|
||||
public Granularity getGranularity();
|
||||
}
|
||||
|
|
|
@ -67,6 +67,7 @@ public class UniformGranularitySpec implements GranularitySpec
|
|||
return wrappedSpec.bucketInterval(dt);
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonProperty("gran")
|
||||
public Granularity getGranularity()
|
||||
{
|
||||
|
|
|
@ -22,15 +22,17 @@ package io.druid.indexing.common.task;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.api.client.util.Sets;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.collect.TreeMultiset;
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.guava.Comparators;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.data.input.Firehose;
|
||||
import io.druid.data.input.FirehoseFactory;
|
||||
|
@ -61,6 +63,7 @@ import java.io.IOException;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.SortedSet;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
public class IndexTask extends AbstractFixedIntervalTask
|
||||
|
@ -133,7 +136,14 @@ public class IndexTask extends AbstractFixedIntervalTask
|
|||
{
|
||||
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
|
||||
final Set<DataSegment> segments = Sets.newHashSet();
|
||||
for (final Interval bucket : granularitySpec.bucketIntervals()) {
|
||||
|
||||
final Set<Interval> validIntervals = Sets.intersection(granularitySpec.bucketIntervals(), getDataIntervals());
|
||||
if (validIntervals.isEmpty()) {
|
||||
throw new ISE("No valid data intervals found. Check out configs!");
|
||||
}
|
||||
|
||||
|
||||
for (final Interval bucket : validIntervals) {
|
||||
final List<ShardSpec> shardSpecs;
|
||||
if (targetPartitionSize > 0) {
|
||||
shardSpecs = determinePartitions(bucket, targetPartitionSize);
|
||||
|
@ -160,6 +170,19 @@ public class IndexTask extends AbstractFixedIntervalTask
|
|||
return TaskStatus.success(getId());
|
||||
}
|
||||
|
||||
private SortedSet<Interval> getDataIntervals() throws IOException
|
||||
{
|
||||
SortedSet<Interval> retVal = Sets.newTreeSet(Comparators.intervalsByStartThenEnd());
|
||||
try (Firehose firehose = firehoseFactory.connect()) {
|
||||
while (firehose.hasMore()) {
|
||||
final InputRow inputRow = firehose.nextRow();
|
||||
Interval interval = granularitySpec.getGranularity().bucket(new DateTime(inputRow.getTimestampFromEpoch()));
|
||||
retVal.add(interval);
|
||||
}
|
||||
}
|
||||
return retVal;
|
||||
}
|
||||
|
||||
private List<ShardSpec> determinePartitions(
|
||||
final Interval interval,
|
||||
final int targetPartitionSize
|
||||
|
|
Loading…
Reference in New Issue