merge from master and review comments

This commit is contained in:
nishantmonu51 2014-12-09 13:16:45 +05:30
commit 1a1b0e6f23
475 changed files with 8332 additions and 8245 deletions

View File

@ -1,27 +1,17 @@
#!/bin/bash -e
#!/bin/bash -eu
PROJECT=druid
BASE=$(cd $(dirname $0) && pwd)
DIST_DIR=dist/tar
VERSION=`cat pom.xml | grep '<version>' | head -1 | tail -1 | sed 's_.*<version>\([^<]*\)</version>.*_\1_'`
SCRIPT_DIR=`dirname $0`
pushd $SCRIPT_DIR
SCRIPT_DIR=`pwd`
popd
echo "Building Version [${VERSION}]"
VERSION=`cat pom.xml | grep version | head -4 | tail -1 | sed 's_.*<version>\([^<]*\)</version>.*_\1_'`
mvn -U -B clean package
echo Using Version[${VERSION}]
JARS=$(find "$BASE" -name "*-$VERSION-selfcontained.jar" | sed -e 's/^/ /')
mvn clean
mvn package
cat <<EOF
if [ $? -ne "0" ]; then
echo "mvn package failed"
exit 2;
fi
echo " "
echo " The following self-contained jars (and more) have been built:"
echo " "
find . -name '*-selfcontained.jar'
The following self-contained jars (and more) have been built:
$JARS
EOF

View File

@ -1,72 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Druid - a distributed column store.
~ Copyright (C) 2012, 2013 Metamarkets Group Inc.
~
~ This program is free software; you can redistribute it and/or
~ modify it under the terms of the GNU General Public License
~ as published by the Free Software Foundation; either version 2
~ of the License, or (at your option) any later version.
~
~ This program is distributed in the hope that it will be useful,
~ but WITHOUT ANY WARRANTY; without even the implied warranty of
~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
~ GNU General Public License for more details.
~
~ You should have received a copy of the GNU General Public License
~ along with this program; if not, write to the Free Software
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-cassandra-storage</artifactId>
<name>druid-cassandra-storage</name>
<description>druid-cassandra-storage</description>
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.7.0-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
</dependency>
<dependency>
<groupId>com.netflix.astyanax</groupId>
<artifactId>astyanax</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
<addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -18,142 +18,159 @@
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid</groupId>
<artifactId>druid-common</artifactId>
<name>druid-common</name>
<description>druid-common</description>
<parent>
<groupId>io.druid</groupId>
<artifactId>druid-common</artifactId>
<name>druid-common</name>
<description>druid-common</description>
<artifactId>druid</artifactId>
<version>0.7.0-SNAPSHOT</version>
</parent>
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.7.0-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>java-util</artifactId>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
</dependency>
<dependencies>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>java-util</artifactId>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
</dependency>
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>org.skife.config</groupId>
<artifactId>config-magic</artifactId>
</dependency>
<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
</dependency>
<dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-guava</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-joda</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-smile</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-multibindings</artifactId>
</dependency>
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
</dependency>
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>org.skife.config</groupId>
<artifactId>config-magic</artifactId>
</dependency>
<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
</dependency>
<dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-guava</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-joda</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-smile</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-multibindings</artifactId>
</dependency>
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>java-util</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>java-util</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
<configuration>
<archive>
<manifest>
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
<addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
<configuration>
<archive>
<manifest>
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
<addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -36,6 +36,10 @@ import java.util.TreeSet;
*/
public class JodaUtils
{
// joda limits years to [-292275054,292278993] that should be reasonable
public static final long MAX_INSTANT = new DateTime("292278993").getMillis();
public static final long MIN_INSTANT = new DateTime("-292275054").getMillis();
public static ArrayList<Interval> condenseIntervals(Iterable<Interval> intervals)
{
ArrayList<Interval> retVal = Lists.newArrayList();

View File

@ -37,7 +37,7 @@ public interface TimelineLookup<VersionType, ObjectType>
* @return Holders representing the interval that the objects exist for, PartitionHolders
* are guaranteed to be complete
*/
public List<TimelineObjectHolder<VersionType, ObjectType>> lookup(Interval interval);
public Iterable<TimelineObjectHolder<VersionType, ObjectType>> lookup(Interval interval);
public PartitionHolder<ObjectType> findEntry(Interval interval, VersionType version);

View File

@ -19,33 +19,44 @@
package io.druid.timeline;
import com.google.common.collect.Lists;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import io.druid.timeline.partition.PartitionHolder;
import org.joda.time.Interval;
import java.util.List;
public class UnionTimeLineLookup<VersionType, ObjectType> implements TimelineLookup<VersionType, ObjectType>
{
Iterable<TimelineLookup<VersionType,ObjectType>> delegates;
public UnionTimeLineLookup( Iterable<TimelineLookup<VersionType,ObjectType>> delegates){
Iterable<TimelineLookup<VersionType, ObjectType>> delegates;
public UnionTimeLineLookup(Iterable<TimelineLookup<VersionType, ObjectType>> delegates)
{
this.delegates = delegates;
}
@Override
public List<TimelineObjectHolder<VersionType, ObjectType>> lookup(Interval interval)
public Iterable<TimelineObjectHolder<VersionType, ObjectType>> lookup(final Interval interval)
{
List<TimelineObjectHolder<VersionType, ObjectType>> rv = Lists.newArrayList();
for(TimelineLookup<VersionType,ObjectType> delegate : delegates){
rv.addAll(delegate.lookup(interval));
}
return rv;
return Iterables.concat(
Iterables.transform(
delegates,
new Function<TimelineLookup<VersionType, ObjectType>, Iterable<TimelineObjectHolder<VersionType, ObjectType>>>()
{
@Override
public Iterable<TimelineObjectHolder<VersionType, ObjectType>> apply(TimelineLookup<VersionType, ObjectType> input)
{
return input.lookup(interval);
}
}
)
);
}
public PartitionHolder<ObjectType> findEntry(Interval interval, VersionType version){
for(TimelineLookup<VersionType,ObjectType> delegate : delegates){
public PartitionHolder<ObjectType> findEntry(Interval interval, VersionType version)
{
for (TimelineLookup<VersionType, ObjectType> delegate : delegates) {
final PartitionHolder<ObjectType> entry = delegate.findEntry(interval, version);
if(entry != null){
if (entry != null) {
return entry;
}
}

View File

@ -26,6 +26,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.metamx.common.Pair;
import io.druid.common.utils.JodaUtils;
import io.druid.timeline.partition.ImmutablePartitionHolder;
import io.druid.timeline.partition.IntegerPartitionChunk;
import io.druid.timeline.partition.PartitionChunk;
@ -1294,7 +1295,7 @@ public class VersionedIntervalTimelineTest
createExpected("2011-01-05/2011-01-10", "2", 2),
createExpected("2011-01-10/2011-01-15", "3", 3)
),
timeline.lookup(new Interval(new DateTime(0), new DateTime(Long.MAX_VALUE)))
timeline.lookup(new Interval(new DateTime(0), new DateTime(JodaUtils.MAX_INSTANT)))
);
}

View File

@ -169,14 +169,13 @@ A filtered aggregator wraps any given aggregator, but only aggregates the values
This makes it possible to compute the results of a filtered and an unfiltered aggregation simultaneously, without having to issue multiple queries, and use both results as part of post-aggregations.
*Limitations:* The filtered aggregator currently only supports selector and not filter with a single selector, i.e. matching a dimension against a single value.
*Limitations:* The filtered aggregator currently only supports 'or', 'and', 'selector' and 'not' filters, i.e. matching one or multiple dimensions against a single value.
*Note:* If only the filtered results are required, consider putting the filter on the query itself, which will be much faster since it does not require scanning all the data.
```json
{
"type" : "filtered",
"name" : "aggMatching",
"filter" : {
"type" : "selector",
"dimension" : <dimension>,

View File

@ -6,7 +6,7 @@ layout: doc_page
[Loading Your Data](Tutorial%3A-Loading-Your-Data-Part-2.html) and [All About Queries](Tutorial%3A-All-About-Queries.html) contain recipes to boot a small druid cluster on localhost. However, when it's time to run a more realistic setup&mdash;for production or just for testing production&mdash;you'll want to find a way to start the cluster on multiple hosts. This document describes two different ways to do this: manually, or as a cloud service via Apache Whirr.
## Manually Booting a Druid Cluster
You can provision individual servers, loading Druid onto each machine (or building it) and setting the required configuration for each type of node. You'll also have to set up required external dependencies. Then you'll have to start each node. This process is outlined in [Tutorial: The Druid Cluster](Tutorial:-The-Druid-Cluster.html).
You can provision individual servers, loading Druid onto each machine (or building it) and setting the required configuration for each type of node. You'll also have to set up required external dependencies. Then you'll have to start each node. This process is outlined in [Tutorial: The Druid Cluster](Tutorial%3A-The-Druid-Cluster.html).
## Apache Whirr

View File

@ -17,6 +17,7 @@ The broker module uses several of the default modules in [Configuration](Configu
|`druid.broker.select.tier.custom.priorities`|`An array of integer priorities.`|Select servers in tiers with a custom priority list.|None|
|`druid.broker.cache.type`|`local`, `memcached`|The type of cache to use for queries.|`local`|
|`druid.broker.cache.unCacheable`|All druid query types|All query types to not cache.|["groupBy", "select"]|
|`druid.broker.cache.numBackgroundThreads`|Non-negative integer|Number of background threads in the thread pool to use for eventual-consistency caching results if caching is used. It is recommended to set this value greater or equal to the number of processing threads. To force caching to execute in the same thread as the query (query results are blocked on caching completion), use a thread count of 0. Setups who use a Druid backend in programatic settings (sub-second re-querying) should consider setting this to 0 to prevent eventual consistency from biting overall performance in the ass. If this is you, please experiment to find out what setting works best. |`0`|
#### Local Cache
@ -26,6 +27,8 @@ The broker module uses several of the default modules in [Configuration](Configu
|`druid.broker.cache.sizeInBytes`|Maximum cache size in bytes. Zero disables caching.|0|
|`druid.broker.cache.initialSize`|Initial size of the hashtable backing the cache.|500000|
|`druid.broker.cache.logEvictionCount`|If non-zero, log cache eviction every `logEvictionCount` items.|0|
|`druid.broker.cache.numBackgroundThreads`|Number of background threads in the thread pool to use for eventual-consistency caching results if caching is used. It is recommended to set this value greater or equal to the number of processing threads. To force caching to execute in the same thread as the query (query results are blocked on caching completion), use a thread count of 0. Setups who use a Druid backend in programatic settings (sub-second re-querying) should consider setting this to 0 to prevent eventual consistency from biting overall performance in the ass. If this is you, please experiment to find out what setting works best. |`0`|
#### Memcache
@ -33,6 +36,6 @@ The broker module uses several of the default modules in [Configuration](Configu
|--------|-----------|-------|
|`druid.broker.cache.expiration`|Memcached [expiration time](https://code.google.com/p/memcached/wiki/NewCommands#Standard_Protocol).|2592000 (30 days)|
|`druid.broker.cache.timeout`|Maximum time in milliseconds to wait for a response from Memcached.|500|
|`druid.broker.cache.hosts`|Command separated list of Memcached hosts `<host:port>`.|none|
|`druid.broker.cache.hosts`|Comma separated list of Memcached hosts `<host:port>`.|none|
|`druid.broker.cache.maxObjectSize`|Maximum object size in bytes for a Memcached object.|52428800 (50 MB)|
|`druid.broker.cache.memcachedPrefix`|Key prefix for all keys in Memcached.|druid|

View File

@ -93,6 +93,22 @@ A sample ingest firehose spec is shown below -
|metrics|The list of metrics to select. If left empty, no metrics are returned. If left null or not defined, all metrics are selected.|no|
|filter| See [Filters](Filters.html)|yes|
#### CombiningFirehose
This firehose can be used to combine and merge data from a list of different firehoses.
This can be used to merge data from more than one firehoses.
```json
{
"type" : "combining",
"delegates" : [ { firehose1 }, { firehose2 }, ..... ]
}
```
|property|description|required?|
|--------|-----------|---------|
|type|combining|yes|
|delegates|list of firehoses to combine data from|yes|
Parsing Data
------------

View File

@ -5,4 +5,18 @@ Historical Node Configuration
=============================
For general Historical Node information, see [here](Historical.html).
The historical module uses several of the default modules in [Configuration](Configuration.html) and has no uniques configs of its own.
Runtime Configuration
---------------------
The historical module uses several of the default modules in [Configuration](Configuration.html) and has a few configs of its own.
#### Local Cache
|Property|Possible Values|Description|Default|
|--------|---------------|-----------|-------|
|`druid.historical.cache.useCache`|`true`,`false`|Allow cache to be used. Cache will NOT be used unless this is set.|`false`|
|`druid.historical.cache.populateCache`|`true`,`false`|Allow cache to be populated. Cache will NOT be populated unless this is set.|`false`|
|`druid.historical.cache.unCacheable`|All druid query types|Do not attempt to cache queries whose types are in this array|`["groupBy","select"]`|
|`druid.historical.cache.numBackgroundThreads`|Non-negative integer|Number of background threads in the thread pool to use for eventual-consistency caching results if caching is used. It is recommended to set this value greater or equal to the number of processing threads. To force caching to execute in the same thread as the query (query results are blocked on caching completion), use a thread count of 0. Setups who use a Druid backend in programatic settings (sub-second re-querying) should consider setting this to 0 to prevent eventual consistency from biting overall performance in the ass. If this is you, please experiment to find out what setting works best.|`0`|

View File

@ -19,6 +19,10 @@ Some great folks have written their own libraries to interact with Druid
- [RDruid](https://github.com/metamx/RDruid) - Druid connector for R
#### Node.js
- [7eggs/node-druid-query](https://github.com/7eggs/node-druid-query) - A Node.js client for Druid
#### Helper Libraries
* [madvertise/druid-dumbo](https://github.com/madvertise/druid-dumbo) - Scripts to help generate batch configs for the ingestion of data into Druid

View File

@ -4,7 +4,7 @@ layout: doc_page
# Extending Druid With Custom Modules
Druid version 0.6 introduces a new module system that allows for the addition of extensions at runtime.
Druid uses a module system that allows for the addition of extensions at runtime.
## Specifying extensions
@ -20,11 +20,12 @@ Druid has the ability to automatically load extension jars from maven at runtime
## Configuring the extensions
Druid 0.6 introduces four new properties for configuring the loading of extensions:
Druid provides the following settings to configure the loading of extensions:
* `druid.extensions.coordinates`
This is a JSON Array list of "groupId:artifactId:version" maven coordinates. Defaults to `[]`
This is a JSON array of "groupId:artifactId[:version]" maven coordinates. For artifacts without version specified, Druid will append the default version. Defaults to `[]`
* `druid.extensions.defaultVersion`
Version to use for extension artifacts without version information. Defaults to the `druid-server` artifact version.
* `druid.extensions.localRepository`
This specifies where to look for the "local repository". The way maven gets dependencies is that it downloads them to a "local repository" on your local disk and then collects the paths to each of the jars. This specifies the directory to consider the "local repository". Defaults to `~/.m2/repository`
* `druid.extensions.remoteRepositories`

View File

@ -27,7 +27,7 @@ Forever load rules are of the form:
}
```
* `type` - this should always be "loadByInterval"
* `type` - this should always be "loadForever"
* `tieredReplicants` - A JSON Object where the keys are the tier names and values are the number of replicas for that tier.
@ -86,7 +86,7 @@ Forever drop rules are of the form:
}
```
* `type` - this should always be "dropByPeriod"
* `type` - this should always be "dropForever"
All segments that match this rule are dropped from the cluster.

View File

@ -3,7 +3,7 @@ layout: doc_page
---
# Tutorial: A First Look at Druid
Greetings! This tutorial will help clarify some core Druid concepts. We will use a realtime dataset and issue some basic Druid queries. If you are ready to explore Druid, and learn a thing or two, read on!
Greetings! This tutorial will help clarify some core Druid concepts. We will use a real-time dataset and issue some basic Druid queries. If you are ready to explore Druid, and learn a thing or two, read on!
About the data
--------------

View File

@ -1,60 +1,73 @@
[
{
"schema": {
"dataSource": "wikipedia",
"aggregators" : [{
"type" : "count",
"name" : "count"
}, {
"type" : "doubleSum",
"name" : "added",
"fieldName" : "added"
}, {
"type" : "doubleSum",
"name" : "deleted",
"fieldName" : "deleted"
}, {
"type" : "doubleSum",
"name" : "delta",
"fieldName" : "delta"
}],
"indexGranularity": "none"
},
"config": {
"maxRowsInMemory": 500000,
"intermediatePersistPeriod": "PT10m"
},
"firehose": {
"type": "kafka-0.7.2",
"consumerProps": {
"zk.connect": "localhost:2181",
"zk.connectiontimeout.ms": "15000",
"zk.sessiontimeout.ms": "15000",
"zk.synctime.ms": "5000",
"groupid": "druid-example",
"fetch.size": "1048586",
"autooffset.reset": "largest",
"autocommit.enable": "false"
},
"feed": "wikipedia",
"parser": {
"timestampSpec": {
"column": "timestamp"
},
"data": {
"format": "json",
"dimensions" : ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"]
"dataSchema" : {
"dataSource" : "wikipedia",
"parser" : {
"type" : "string",
"parseSpec" : {
"format" : "json",
"timestampSpec" : {
"column" : "timestamp",
"format" : "auto"
},
"dimensionsSpec" : {
"dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
"dimensionExclusions" : [],
"spatialDimensions" : []
}
}
},
"metricsSpec" : [{
"type" : "count",
"name" : "count"
}, {
"type" : "doubleSum",
"name" : "added",
"fieldName" : "added"
}, {
"type" : "doubleSum",
"name" : "deleted",
"fieldName" : "deleted"
}, {
"type" : "doubleSum",
"name" : "delta",
"fieldName" : "delta"
}],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "DAY",
"queryGranularity" : "NONE"
}
},
"plumber": {
"type": "realtime",
"ioConfig" : {
"type" : "realtime",
"firehose": {
"type": "kafka-0.7.2",
"consumerProps": {
"zk.connect": "localhost:2181",
"zk.connectiontimeout.ms": "15000",
"zk.sessiontimeout.ms": "15000",
"zk.synctime.ms": "5000",
"groupid": "druid-example",
"fetch.size": "1048586",
"autooffset.reset": "largest",
"autocommit.enable": "false"
},
"feed": "wikipedia"
},
"plumber": {
"type": "realtime"
}
},
"tuningConfig": {
"type" : "realtime",
"maxRowsInMemory": 500000,
"intermediatePersistPeriod": "PT10m",
"windowPeriod": "PT10m",
"segmentGranularity": "hour",
"basePersistDirectory": "\/tmp\/realtime\/basePersist",
"rejectionPolicy": {
"type": "test"
"type": "messageTime"
}
}
}
]
]

View File

@ -1,51 +1,82 @@
{
"dataSource": "wikipedia",
"timestampSpec" : {
"column": "timestamp",
"format": "iso"
"dataSchema": {
"dataSource": "wikipedia",
"parser": {
"type": "string",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"page",
"language",
"user",
"unpatrolled",
"newPage",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city"
],
"dimensionExclusions": [],
"spatialDimensions": []
}
}
},
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"intervals": ["2013-08-31/2013-09-01"]
}
},
"dataSpec": {
"format": "json",
"dimensions" : ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"]
"ioConfig": {
"type": "hadoop",
"inputSpec": {
"type": "static",
"paths": "/myPath/druid-services-0.6.160/examples/indexing/wikipedia_data.json"
},
"metadataUpdateSpec": {
"type": "db",
"connectURI": "jdbc:mysql:\/\/localhost:3306\/druid",
"user": "druid",
"password": "diurd",
"segmentTable": "druid_segments"
},
"segmentOutputPath": "\/tmp\/segments"
},
"granularitySpec" : {
"type" : "uniform",
"gran" : "DAY",
"intervals" : [ "2013-08-31/2013-09-01" ]
},
"pathSpec": {
"type": "static",
"paths": "examples/indexing/wikipedia_data.json"
},
"rollupSpec": {
"aggs": [{
"type" : "count",
"name" : "count"
}, {
"type" : "doubleSum",
"name" : "added",
"fieldName" : "added"
}, {
"type" : "doubleSum",
"name" : "deleted",
"fieldName" : "deleted"
}, {
"type" : "doubleSum",
"name" : "delta",
"fieldName" : "delta"
}],
"rollupGranularity": "none"
},
"workingPath": "\/tmp\/working_path",
"segmentOutputPath": "\/tmp\/segments",
"partitionsSpec": {
"targetPartitionSize": 5000000
},
"metadataUpdateSpec": {
"type": "db",
"connectURI": "jdbc:mysql:\/\/localhost:3306\/druid",
"user": "druid",
"password": "diurd",
"segmentTable": "druid_segments"
"tuningConfig": {
"type": "hadoop",
"workingPath": "\/tmp\/working_path",
"partitionsSpec": {
"targetPartitionSize": 5000000
}
}
}
}

View File

@ -1,43 +1,76 @@
{
"type" : "index_hadoop",
"config": {
"dataSource" : "wikipedia",
"timestampSpec" : {
"column": "timestamp",
"format": "auto"
"type": "index_hadoop",
"spec": {
"dataSchema": {
"dataSource": "wikipedia",
"parser": {
"type": "string",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"page",
"language",
"user",
"unpatrolled",
"newPage",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city"
],
"dimensionExclusions": [],
"spatialDimensions": []
}
}
},
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"intervals": ["2013-08-31/2013-09-01"]
}
},
"dataSpec" : {
"format" : "json",
"dimensions" : ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"]
"ioConfig": {
"type": "hadoop",
"inputSpec": {
"type": "static",
"paths": "/myPath/druid-services-0.6.160/examples/indexing/wikipedia_data.json"
}
},
"granularitySpec" : {
"type" : "uniform",
"gran" : "DAY",
"intervals" : [ "2013-08-31/2013-09-01" ]
},
"pathSpec" : {
"type" : "static",
"paths" : "examples/indexing/wikipedia_data.json"
},
"targetPartitionSize" : 5000000,
"rollupSpec" : {
"aggs": [{
"type" : "count",
"name" : "count"
}, {
"type" : "doubleSum",
"name" : "added",
"fieldName" : "added"
}, {
"type" : "doubleSum",
"name" : "deleted",
"fieldName" : "deleted"
}, {
"type" : "doubleSum",
"name" : "delta",
"fieldName" : "delta"
}],
"rollupGranularity" : "none"
"tuningConfig": {
"type": "hadoop",
"partitionsSpec": {
"targetPartitionSize": 5000000
}
}
}
}
}

View File

@ -1,39 +1,76 @@
{
"type" : "index",
"dataSource" : "wikipedia",
"granularitySpec" : {
"type" : "uniform",
"gran" : "DAY",
"intervals" : [ "2013-08-31/2013-09-01" ]
},
"aggregators" : [{
"type" : "count",
"name" : "count"
}, {
"type" : "doubleSum",
"name" : "added",
"fieldName" : "added"
}, {
"type" : "doubleSum",
"name" : "deleted",
"fieldName" : "deleted"
}, {
"type" : "doubleSum",
"name" : "delta",
"fieldName" : "delta"
}],
"firehose" : {
"type" : "local",
"baseDir" : "examples/indexing/",
"filter" : "wikipedia_data.json",
"parser" : {
"timestampSpec" : {
"column" : "timestamp"
"type": "index",
"spec": {
"dataSchema": {
"dataSource": "wikipedia",
"parser": {
"type": "string",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"page",
"language",
"user",
"unpatrolled",
"newPage",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city"
],
"dimensionExclusions": [],
"spatialDimensions": []
}
}
},
"data" : {
"format" : "json",
"dimensions" : ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"]
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"intervals": ["2013-08-31/2013-09-01"]
}
},
"ioConfig": {
"type": "index",
"firehose": {
"type": "local",
"baseDir": "/MyPath/druid-services-0.6.160/examples/indexing/",
"filter": "wikipedia_data.json"
}
},
"tuningConfig": {
"type": "index",
"targetPartitionSize": 0,
"rowFlushBoundary": 0
}
}
}
}

View File

@ -1,73 +1,92 @@
{
"type": "index_realtime",
"schema": {
"dataSource": "wikipedia",
"aggregators": [
{
"type": "count",
"name": "count"
"type" : "index_realtime",
"spec" : {
"dataSchema": {
"dataSource": "wikipedia",
"parser": {
"type": "string",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"page",
"language",
"user",
"unpatrolled",
"newPage",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city"
],
"dimensionExclusions": [],
"spatialDimensions": []
}
}
},
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE"
}
],
"indexGranularity": "none"
},
"fireDepartmentConfig": {
"maxRowsInMemory": 500000,
"intermediatePersistPeriod": "PT10m"
},
"firehose": {
"type": "kafka-0.7.2",
"consumerProps": {
"zk.connect": "localhost:2181",
"zk.connectiontimeout.ms": "15000",
"zk.sessiontimeout.ms": "15000",
"zk.synctime.ms": "5000",
"groupid": "druid-example",
"fetch.size": "1048586",
"autooffset.reset": "largest",
"autocommit.enable": "false"
},
"feed": "wikipedia",
"parser": {
"timestampSpec": {
"column": "timestamp"
"ioConfig": {
"type": "realtime",
"firehose": {
"type": "kafka-0.7.2",
"consumerProps": {
"zk.connect": "localhost:2181",
"zk.connectiontimeout.ms": "15000",
"zk.sessiontimeout.ms": "15000",
"zk.synctime.ms": "5000",
"groupid": "druid-example",
"fetch.size": "1048586",
"autooffset.reset": "largest",
"autocommit.enable": "false"
},
"feed": "wikipedia"
},
"data": {
"format": "json",
"dimensions": [
"page",
"language",
"user",
"unpatrolled",
"newPage",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city"
]
"plumber": {
"type": "realtime"
}
},
"tuningConfig": {
"type": "realtime",
"maxRowsInMemory": 500000,
"intermediatePersistPeriod": "PT10m",
"windowPeriod": "PT10m",
"basePersistDirectory": "\/tmp\/realtime\/basePersist",
"rejectionPolicy": {
"type": "serverTime"
}
}
},
"windowPeriod": "PT10m",
"segmentGranularity": "hour",
"rejectionPolicy": {
"type": "test"
}
}
}

View File

@ -1,71 +1,113 @@
{
"type": "index_realtime",
"schema": {
"dataSource": "wikipedia",
"aggregators": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
}
],
"indexGranularity": "none"
},
"fireDepartmentConfig": {
"maxRowsInMemory": 500000,
"intermediatePersistPeriod": "PT10m"
},
"firehose": {
"type": "irc",
"nick": "wiki1234567890",
"host": "irc.wikimedia.org",
"channels": [
"#en.wikipedia",
"#fr.wikipedia",
"#de.wikipedia",
"#ja.wikipedia"
],
"decoder": {
"type": "wikipedia",
"namespaces": {
"#en.wikipedia": {
"_empty_": "main",
"Category": "category",
"$1 talk": "project talk",
"Template talk": "template talk",
"Help talk": "help talk",
"Media": "media",
"MediaWiki talk": "mediawiki talk",
"File talk": "file talk",
"MediaWiki": "mediawiki",
"User": "user",
"File": "file",
"User talk": "user talk",
"Template": "template",
"Help": "help",
"Special": "special",
"Talk": "talk",
"Category talk": "category talk"
"spec": {
"dataSchema": {
"dataSource": "wikipedia",
"parser": {
"type": "irc",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "timestamp",
"format": "iso"
},
"dimensionsSpec": {
"dimensions": [
"page",
"language",
"user",
"unpatrolled",
"newPage",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city"
],
"dimensionExclusions": [],
"spatialDimensions": []
}
},
"decoder": {
"type": "wikipedia",
"namespaces": {
"#en.wikipedia": {
"_empty_": "main",
"Category": "category",
"$1 talk": "project talk",
"Template talk": "template talk",
"Help talk": "help talk",
"Media": "media",
"MediaWiki talk": "mediawiki talk",
"File talk": "file talk",
"MediaWiki": "mediawiki",
"User": "user",
"File": "file",
"User talk": "user talk",
"Template": "template",
"Help": "help",
"Special": "special",
"Talk": "talk",
"Category talk": "category talk"
}
}
}
},
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE"
}
},
"timeDimension": "timestamp",
"timeFormat": "iso"
},
"windowPeriod": "PT10m",
"segmentGranularity": "hour"
}
"ioConfig": {
"type": "realtime",
"firehose": {
"type": "irc",
"nick": "wiki1234567890",
"host": "irc.wikimedia.org",
"channels": [
"#en.wikipedia",
"#fr.wikipedia",
"#de.wikipedia",
"#ja.wikipedia"
]
},
"plumber": {
"type": "realtime"
}
},
"tuningConfig": {
"type": "realtime",
"maxRowsInMemory": 500000,
"intermediatePersistPeriod": "PT10m",
"windowPeriod": "PT10m",
"basePersistDirectory": "\/tmp\/realtime\/basePersist",
"rejectionPolicy": {
"type": "serverTime"
}
}
}
}

View File

@ -1,12 +0,0 @@
{
"queryType": "groupBy",
"dataSource": "rabbitmqtest",
"granularity": "all",
"dimensions": [],
"aggregations": [
{ "type": "count", "name": "rows" },
{"type": "longSum", "name": "imps", "fieldName": "impressions"},
{"type": "doubleSum", "name": "wp", "fieldName": "wp"}
],
"intervals": ["2010-01-01T00:00/2020-01-01T00:00"]
}

View File

@ -1,48 +0,0 @@
[{
"schema" : {
"dataSource":"rabbitmqtest",
"aggregators":[
{"type":"count", "name":"impressions"},
{"type":"doubleSum","name":"wp","fieldName":"wp"}
],
"indexGranularity":"minute",
"shardSpec" : { "type": "none" }
},
"config" : {
"maxRowsInMemory" : 500000,
"intermediatePersistPeriod" : "PT1m"
},
"firehose" : {
"type" : "rabbitmq",
"connection" : {
"host": "localhost",
"username": "test-dude",
"password": "word-dude",
"virtualHost": "test-vhost"
},
"config" : {
"exchange": "test-exchange",
"queue" : "druidtest",
"routingKey": "#",
"durable": "true",
"exclusive": "false",
"autoDelete": "false",
"maxRetries": "10",
"retryIntervalSeconds": "1",
"maxDurationSeconds": "300"
},
"parser" : {
"timestampSpec" : { "column" : "utcdt", "format" : "iso" },
"data" : { "format" : "json" },
"dimensionExclusions" : ["wp"]
}
},
"plumber" : {
"type" : "realtime",
"windowPeriod" : "PT5m",
"segmentGranularity":"hour",
"basePersistDirectory" : "/tmp/realtime/basePersist",
"rejectionPolicy": { "type": "test" }
}
}]

View File

@ -1,19 +0,0 @@
{
"queryType": "groupBy",
"dataSource": "randSeq",
"granularity": "all",
"dimensions": [],
"aggregations":[
{ "type": "count", "name": "rows"},
{ "type": "doubleSum", "fieldName": "events", "name": "e"},
{ "type": "doubleSum", "fieldName": "outColumn", "name": "randomNumberSum"}
],
"postAggregations":[
{ "type":"arithmetic",
"name":"avg_random",
"fn":"/",
"fields":[ {"type":"fieldAccess","name":"randomNumberSum","fieldName":"randomNumberSum"},
{"type":"fieldAccess","name":"rows","fieldName":"rows"} ]}
],
"intervals":["2012-10-01T00:00/2020-01-01T00"]
}

View File

@ -1,32 +0,0 @@
[{
"schema": {
"dataSource": "randseq",
"aggregators": [
{"type": "count", "name": "events"},
{"type": "doubleSum", "name": "outColumn", "fieldName": "inColumn"}
],
"indexGranularity": "minute",
"shardSpec": {"type": "none"}
},
"config": {
"maxRowsInMemory": 50000,
"intermediatePersistPeriod": "PT10m"
},
"firehose": {
"type": "rand",
"sleepUsec": 100000,
"maxGeneratedRows": 5000000,
"seed": 0,
"nTokens": 255,
"nPerSleep": 3
},
"plumber": {
"type": "realtime",
"windowPeriod": "PT5m",
"segmentGranularity": "hour",
"basePersistDirectory": "/tmp/example/rand_realtime/basePersist"
}
}]

View File

@ -1,12 +1,24 @@
{
"queryType": "groupBy",
"dataSource": "twitterstream",
"granularity": "all",
"dimensions": ["lang", "utc_offset"],
"aggregations":[
{ "type": "count", "name": "rows"},
{ "type": "doubleSum", "fieldName": "tweets", "name": "tweets"}
],
"filter": { "type": "selector", "dimension": "lang", "value": "en" },
"intervals":["2012-10-01T00:00/2020-01-01T00"]
"queryType": "timeseries",
"dataSource": "twitterstream",
"granularity": "all",
"aggregations": [
{
"type": "count",
"name": "rows"
},
{
"type": "doubleSum",
"fieldName": "tweets",
"name": "tweets"
}
],
"filter": {
"type": "selector",
"dimension": "lang",
"value": "en"
},
"intervals": [
"2012-10-01T00:00\/2020-01-01T00"
]
}

View File

@ -1,13 +1,21 @@
{
"queryType": "search",
"dataSource": "twitterstream",
"granularity": "all",
"searchDimensions": ["htags"],
"limit": 1,
"query": {
"type": "fragment",
"values": ["men"],
"sort": { "type": "strlen" }
},
"intervals":["2012-10-01T00:00/2020-01-01T00"]
"queryType": "search",
"dataSource": "twitterstream",
"granularity": "all",
"searchDimensions": [
"htags"
],
"limit": 1,
"query": {
"type": "fragment",
"values": [
"men"
],
"sort": {
"type": "strlen"
}
},
"intervals": [
"2012-10-01T00:00\/2020-01-01T00"
]
}

View File

@ -1,44 +1,119 @@
[{
"schema": {
"dataSource": "twitterstream",
"aggregators": [
{"type": "count", "name": "tweets"},
{"type": "doubleSum", "fieldName": "follower_count", "name": "total_follower_count"},
{"type": "doubleSum", "fieldName": "retweet_count", "name": "total_retweet_count" },
{"type": "doubleSum", "fieldName": "friends_count", "name": "total_friends_count" },
{"type": "doubleSum", "fieldName": "statuses_count", "name": "total_statuses_count"},
{"type": "min", "fieldName": "follower_count", "name": "min_follower_count"},
{"type": "max", "fieldName": "follower_count", "name": "max_follower_count"},
{"type": "min", "fieldName": "friends_count", "name": "min_friends_count"},
{"type": "max", "fieldName": "friends_count", "name": "max_friends_count"},
{"type": "min", "fieldName": "statuses_count", "name": "min_statuses_count"},
{"type": "max", "fieldName": "statuses_count", "name": "max_statuses_count"},
{"type": "min", "fieldName": "retweet_count", "name": "min_retweet_count"},
{"type": "max", "fieldName": "retweet_count", "name": "max_retweet_count"}
],
"indexGranularity": "minute",
"shardSpec": {"type": "none"}
[
{
"dataSchema": {
"dataSource": "twitterstream",
"parser": {
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "utcdt",
"format": "iso"
},
"dimensionsSpec": {
"dimensions": [
],
"dimensionExclusions": [
],
"spatialDimensions": [
]
}
}
},
"metricsSpec": [
{
"type": "count",
"name": "tweets"
},
{
"type": "doubleSum",
"fieldName": "follower_count",
"name": "total_follower_count"
},
{
"type": "doubleSum",
"fieldName": "retweet_count",
"name": "total_retweet_count"
},
{
"type": "doubleSum",
"fieldName": "friends_count",
"name": "total_friends_count"
},
{
"type": "doubleSum",
"fieldName": "statuses_count",
"name": "total_statuses_count"
},
{
"type": "min",
"fieldName": "follower_count",
"name": "min_follower_count"
},
{
"type": "max",
"fieldName": "follower_count",
"name": "max_follower_count"
},
{
"type": "min",
"fieldName": "friends_count",
"name": "min_friends_count"
},
{
"type": "max",
"fieldName": "friends_count",
"name": "max_friends_count"
},
{
"type": "min",
"fieldName": "statuses_count",
"name": "min_statuses_count"
},
{
"type": "max",
"fieldName": "statuses_count",
"name": "max_statuses_count"
},
{
"type": "min",
"fieldName": "retweet_count",
"name": "min_retweet_count"
},
{
"type": "max",
"fieldName": "retweet_count",
"name": "max_retweet_count"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE"
}
},
"config": {
"maxRowsInMemory": 50000,
"intermediatePersistPeriod": "PT2m"
},
"firehose": {
"ioConfig": {
"type": "realtime",
"firehose": {
"type": "twitzer",
"maxEventCount": 500000,
"maxRunMinutes": 120
},
"plumber": {
"type": "realtime"
}
},
"plumber": {
"type": "realtime",
"windowPeriod": "PT3m",
"segmentGranularity": "hour",
"basePersistDirectory": "/tmp/example/twitter_realtime/basePersist"
"tuningConfig": {
"type": "realtime",
"maxRowsInMemory": 500000,
"intermediatePersistPeriod": "PT2m",
"windowPeriod": "PT3m",
"basePersistDirectory": "\/tmp\/realtime\/basePersist",
"rejectionPolicy": {
"type": "messageTime"
}
}
}]
}
]

View File

@ -1,27 +0,0 @@
{
"queryType":"groupBy",
"dataSource":"webstream",
"granularity":"minute",
"dimensions":[
"timezone"
],
"aggregations":[
{
"type":"count",
"name":"rows"
},
{
"type":"doubleSum",
"fieldName":"known_users",
"name":"known_users"
}
],
"filter":{
"type":"selector",
"dimension":"country",
"value":"US"
},
"intervals":[
"2013-06-01T00:00/2020-01-01T00"
]
}

View File

@ -1,47 +0,0 @@
[{
"schema": {
"dataSource": "webstream",
"aggregators": [
{"type": "count", "name": "rows"},
{"type": "doubleSum", "fieldName": "known_users", "name": "known_users"}
],
"indexGranularity": "second",
"shardSpec": {"type": "none"}
},
"config": {
"maxRowsInMemory": 50000,
"intermediatePersistPeriod": "PT2m"
},
"firehose": {
"type": "webstream",
"url":"http://developer.usa.gov/1usagov",
"renamedDimensions": {
"g":"bitly_hash",
"c":"country",
"a":"user",
"cy":"city",
"l":"encoding_user_login",
"hh":"short_url",
"hc":"timestamp_hash",
"h":"user_bitly_hash",
"u":"url",
"tz":"timezone",
"t":"time",
"r":"referring_url",
"gr":"geo_region",
"nk":"known_users",
"al":"accept_language"
},
"timeDimension":"t",
"timeFormat":"posix"
},
"plumber": {
"type": "realtime",
"windowPeriod": "PT3m",
"segmentGranularity": "hour",
"basePersistDirectory": "/tmp/example/usagov_realtime/basePersist"
}
}]

View File

@ -1,10 +1,7 @@
{
"queryType":"groupBy",
"queryType":"timeseries",
"dataSource":"wikipedia",
"granularity":"minute",
"dimensions":[
"page"
],
"aggregations":[
{
"type":"count",
@ -25,3 +22,4 @@
"2013-06-01T00:00/2020-01-01T00"
]
}

View File

@ -1,63 +1,111 @@
[{
"schema": {
"dataSource": "wikipedia",
"aggregators": [
{"type": "count", "name": "count"},
{"type": "longSum", "fieldName": "added", "name": "added"},
{"type": "longSum", "fieldName": "deleted", "name": "deleted"},
{"type": "longSum", "fieldName": "delta", "name": "delta"}
],
"indexGranularity": "minute",
"shardSpec": {"type": "none"}
},
"config": {
"maxRowsInMemory": 50000,
"intermediatePersistPeriod": "PT2m"
},
"firehose": {
"type": "irc",
"nick": "wiki1234567890",
"host": "irc.wikimedia.org",
"channels": [
"#en.wikipedia",
"#fr.wikipedia",
"#de.wikipedia",
"#ja.wikipedia"
],
"decoder": {
"type": "wikipedia",
"namespaces": {
"#en.wikipedia": {
"": "main",
"Category": "category",
"$1 talk": "project talk",
"Template talk": "template talk",
"Help talk": "help talk",
"Media": "media",
"MediaWiki talk": "mediawiki talk",
"File talk": "file talk",
"MediaWiki": "mediawiki",
"User": "user",
"File": "file",
"User talk": "user talk",
"Template": "template",
"Help": "help",
"Special": "special",
"Talk": "talk",
"Category talk": "category talk"
}
}
"dataSchema": {
"dataSource": "wikipedia",
"parser": {
"type": "irc",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "timestamp",
"format": "iso"
},
"timeDimension":"timestamp",
"timeFormat":"iso"
"dimensionsSpec": {
"dimensions": [
"page",
"language",
"user",
"unpatrolled",
"newPage",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city"
],
"dimensionExclusions": [],
"spatialDimensions": []
}
},
"decoder": {
"type": "wikipedia",
"namespaces": {
"#en.wikipedia": {
"_empty_": "main",
"Category": "category",
"$1 talk": "project talk",
"Template talk": "template talk",
"Help talk": "help talk",
"Media": "media",
"MediaWiki talk": "mediawiki talk",
"File talk": "file talk",
"MediaWiki": "mediawiki",
"User": "user",
"File": "file",
"User talk": "user talk",
"Template": "template",
"Help": "help",
"Special": "special",
"Talk": "talk",
"Category talk": "category talk"
}
}
}
},
"plumber": {
"type": "realtime",
"windowPeriod": "PT3m",
"segmentGranularity": "hour",
"basePersistDirectory": "/tmp/example/wikipedia/basePersist"
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE"
}
},
"ioConfig": {
"type": "realtime",
"firehose": {
"type": "irc",
"nick": "wiki1234567890",
"host": "irc.wikimedia.org",
"channels": [
"#en.wikipedia",
"#fr.wikipedia",
"#de.wikipedia",
"#ja.wikipedia"
]
},
"plumber": {
"type": "realtime"
}
},
"tuningConfig": {
"type": "realtime",
"maxRowsInMemory": 500000,
"intermediatePersistPeriod": "PT10m",
"windowPeriod": "PT10m",
"basePersistDirectory": "\/tmp\/realtime\/basePersist",
"rejectionPolicy": {
"type": "serverTime"
}
}
}]

View File

@ -45,7 +45,7 @@ for delay in 5 30 30 30 30 30 30 30 30 30 30
echo "sleep for $delay seconds..."
echo " "
sleep $delay
curl -X POST 'http://localhost:8083/druid/v2/?w' -H 'content-type: application/json' -d "`cat ${QUERY_FILE}`"
curl -X POST 'http://localhost:8083/druid/v2/?pretty' -H 'content-type: application/json' -d "`cat ${QUERY_FILE}`"
echo " "
echo " "
done

View File

@ -58,7 +58,7 @@ DRUID_CP=${EXAMPLE_LOC}
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/../config/realtime
#For the kit
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/lib/*
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/config/_global
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/config/_common
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/config/realtime
echo "Running command:"

View File

@ -1,5 +0,0 @@
## Example Prerequisite
The code in this example assumes Cassandra has been configured as deep storage for Druid.
For details on how to accomplish this, see [Cassandra Deep Storage](http://druid.io/docs/latest/Cassandra-Deep-Storage.html).

View File

@ -1 +0,0 @@
curl -sX POST "http://localhost:9090/druid/v2/?pretty=true" -H 'content-type: application/json' -d @query

View File

@ -1,19 +0,0 @@
{
"queryType": "groupBy",
"dataSource": "randSeq",
"granularity": "all",
"dimensions": [],
"aggregations":[
{ "type": "count", "name": "rows"},
{ "type": "doubleSum", "fieldName": "events", "name": "e"},
{ "type": "doubleSum", "fieldName": "outColumn", "name": "randomNumberSum"}
],
"postAggregations":[
{ "type":"arithmetic",
"name":"avg_random",
"fn":"/",
"fields":[ {"type":"fieldAccess","name":"randomNumberSum","fieldName":"randomNumberSum"},
{"type":"fieldAccess","name":"rows","fieldName":"rows"} ]}
],
"intervals":["2012-10-01T00:00/2020-01-01T00"]
}

View File

@ -1,2 +0,0 @@
CREATE TABLE index_storage ( key text, chunk text, value blob, PRIMARY KEY (key, chunk)) WITH COMPACT STORAGE;
CREATE TABLE descriptor_storage ( key varchar, lastModified timestamp, descriptor varchar, PRIMARY KEY (key) ) WITH COMPACT STORAGE;

View File

@ -1,5 +1,5 @@
# Extensions
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.7.0","io.druid.extensions:druid-kafka-seven:0.7.0","io.druid.extensions:druid-rabbitmq:0.7.0", "io.druid.extensions:druid-s3-extensions:0.7.0"]
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.7.0-SNAPSHOT","io.druid.extensions:druid-kafka-seven:0.7.0-SNAPSHOT","io.druid.extensions:druid-rabbitmq:0.7.0-SNAPSHOT"]
# Zookeeper
druid.zk.service.host=localhost
@ -14,6 +14,10 @@ druid.metadata.storage.connector.password=diurd
druid.storage.type=local
druid.storage.storage.storageDirectory=/tmp/druid/localStorage
# Cache (we use a simple 10mb heap-based local cache on the broker)
druid.cache.type=local
druid.cache.sizeInBytes=10000000
# Indexing service discovery
druid.selectors.indexing.serviceName=overlord

View File

@ -1,6 +1,9 @@
druid.host=localhost
druid.service=broker
druid.port=8080
druid.service=broker
druid.broker.cache.useCache=true
druid.broker.cache.populateCache=true
# Bump these up only for faster nested groupBy
druid.processing.buffer.sizeBytes=100000000

View File

@ -1,5 +1,7 @@
druid.host=localhost
druid.service=coordinator
druid.port=8082
druid.service=coordinator
druid.coordinator.startDelay=PT70s
# The coordinator begins assignment operations after the start delay.
# We override the default here to start things up faster for examples.
druid.coordinator.startDelay=PT70s

View File

@ -1,17 +1,11 @@
druid.host=localhost
druid.service=historical
druid.port=8081
druid.service=historical
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.7.0"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
druid.s3.accessKey=AKIAIMKECRUYKDQGR6YQ
druid.server.maxSize=10000000000
# Change these to make Druid faster
# We can only 1 scan segment in parallel with these configs.
# Our intermediate buffer is also very small so longer topNs will be slow.
druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1
druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 10000000000}]
druid.server.maxSize=10000000000

View File

@ -2,6 +2,7 @@ druid.host=localhost
druid.port=8080
druid.service=overlord
# Run the overlord in local mode with a single peon to execute tasks
druid.indexer.queue.startDelay=PT0M
druid.indexer.runner.javaOpts="-server -Xmx256m"
druid.indexer.fork.property.druid.processing.numThreads=1

View File

@ -1,10 +1,12 @@
druid.host=localhost
druid.service=realtime
druid.port=8083
druid.service=realtime
# Change this config to metadata to hand off to the rest of the Druid cluster
# Change this config to 'metadata' to hand off to the rest of the Druid cluster
druid.publish.type=noop
# We can only 1 scan segment in parallel with these configs.
# Our intermediate buffer is also very small so longer topNs will be slow.
druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1

View File

@ -18,106 +18,119 @@
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-examples</artifactId>
<name>druid-examples</name>
<description>druid-examples</description>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-examples</artifactId>
<name>druid-examples</name>
<description>druid-examples</description>
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.7.0-SNAPSHOT</version>
</parent>
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.7.0-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-common</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-common</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-core</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-async</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-stream</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>commons-validator</groupId>
<artifactId>commons-validator</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-core</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-async</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-stream</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>commons-validator</groupId>
<artifactId>commons-validator</artifactId>
<version>1.4.0</version>
</dependency>
<!-- For tests! -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<!-- For tests! -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<outputFile>
${project.build.directory}/${project.artifactId}-${project.version}-selfcontained.jar
</outputFile>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
<configuration>
<archive>
<manifest>
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
<addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
<build>
<plugins>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<outputFile>
${project.build.directory}/${project.artifactId}-${project.version}-selfcontained.jar
</outputFile>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
<configuration>
<archive>
<manifest>
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
<addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -23,9 +23,7 @@ import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import io.druid.examples.rand.RandomFirehoseFactory;
import io.druid.examples.twitter.TwitterSpritzerFirehoseFactory;
import io.druid.examples.web.WebFirehoseFactory;
import io.druid.initialization.DruidModule;
import java.util.Arrays;
@ -41,9 +39,7 @@ public class ExamplesDruidModule implements DruidModule
return Arrays.<Module>asList(
new SimpleModule("ExamplesModule")
.registerSubtypes(
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"),
new NamedType(RandomFirehoseFactory.class, "rand"),
new NamedType(WebFirehoseFactory.class, "webstream")
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer")
)
);
}

View File

@ -1,274 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.examples.rand;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Maps;
import com.metamx.common.logger.Logger;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.InputRowParser;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Map;
import java.util.Random;
import static java.lang.Thread.sleep;
/**
* Random value sequence Firehost Factory named "rand".
* Builds a Firehose that emits a stream of random numbers (outColumn, a positive double)
* with timestamps along with an associated token (target). This provides a timeseries
* that requires no network access for demonstration, characterization, and testing.
* The generated tuples can be thought of as asynchronously
* produced triples (timestamp, outColumn, target) where the timestamp varies depending on
* speed of processing.
*
* <p>
* InputRows are produced as fast as requested, so this can be used to determine the
* upper rate of ingest if sleepUsec is set to 0; nTokens specifies how many associated
* target labels are used. Generation is round-robin for nTokens and sleep occurs
* every nPerSleep values generated. A random number seed can be used by setting the
* firehose parameter "seed" to a non-zero value so that values can be reproducible
* (but note that timestamp is not deterministic because timestamps are obtained at
* the moment an event is delivered.)
* Values are offset by adding the modulus of the token number to the random number
* so that token values have distinct, non-overlapping ranges.
* </p>
*
* Example spec file:
* <pre>
* [{
* "schema" : { "dataSource":"randseq",
* "aggregators":[ {"type":"count", "name":"events"},
* {"type":"doubleSum","name":"outColumn","fieldName":"inColumn"} ],
* "indexGranularity":"minute",
* "shardSpec" : { "type": "none" } },
* "config" : { "maxRowsInMemory" : 50000,
* "intermediatePersistPeriod" : "PT2m" },
*
* "firehose" : { "type" : "rand",
* "sleepUsec": 100000,
* "maxGeneratedRows" : 5000000,
* "seed" : 0,
* "nTokens" : 19,
* "nPerSleep" : 3
* },
*
* "plumber" : { "type" : "realtime",
* "windowPeriod" : "PT5m",
* "segmentGranularity":"hour",
* "basePersistDirectory" : "/tmp/realtime/basePersist" }
* }]
* </pre>
*
* Example query using POST to /druid/v2/ (where UTC date and time MUST include the current hour):
* <pre>
* {
* "queryType": "groupBy",
* "dataSource": "randSeq",
* "granularity": "all",
* "dimensions": [],
* "aggregations":[
* { "type": "count", "name": "rows"},
* { "type": "doubleSum", "fieldName": "events", "name": "e"},
* { "type": "doubleSum", "fieldName": "outColumn", "name": "randomNumberSum"}
* ],
* "postAggregations":[
* { "type":"arithmetic",
* "name":"avg_random",
* "fn":"/",
* "fields":[ {"type":"fieldAccess","name":"randomNumberSum","fieldName":"randomNumberSum"},
* {"type":"fieldAccess","name":"rows","fieldName":"rows"} ]}
* ],
* "intervals":["2012-10-01T00:00/2020-01-01T00"]
* }
* </pre>
*/
@JsonTypeName("rand")
public class RandomFirehoseFactory implements FirehoseFactory<InputRowParser>
{
private static final Logger log = new Logger(RandomFirehoseFactory.class);
/**
* msec to sleep before generating a new row; if this and delayNsec are 0, then go as fast as possible.
* json param sleepUsec (microseconds) is used to initialize this.
*/
private final long delayMsec;
/**
* nsec to sleep before generating a new row; if this and delayMsec are 0, then go as fast as possible.
* json param sleepUsec (microseconds) is used to initialize this.
*/
private final int delayNsec;
/**
* max rows to generate, -1 is infinite, 0 means nothing is generated; use this to prevent
* infinite space consumption or to see what happens when a Firehose stops delivering
* values, or to have hasMore() return false.
*/
private final long maxGeneratedRows;
/**
* seed for random number generator; if 0, then no seed is used.
*/
private final long seed;
/**
* number of tokens to randomly associate with values (no heap limits). This can be used to
* stress test the number of tokens.
*/
private final int nTokens;
/**
* Number of token events per sleep interval.
*/
private final int nPerSleep;
@JsonCreator
public RandomFirehoseFactory(
@JsonProperty("sleepUsec") Long sleepUsec,
@JsonProperty("maxGeneratedRows") Long maxGeneratedRows,
@JsonProperty("seed") Long seed,
@JsonProperty("nTokens") Integer nTokens,
@JsonProperty("nPerSleep") Integer nPerSleep
)
{
long nsec = (sleepUsec > 0) ? sleepUsec * 1000L : 0;
long msec = nsec / 1000000L;
this.delayMsec = msec;
this.delayNsec = (int) (nsec - (msec * 1000000L));
this.maxGeneratedRows = maxGeneratedRows;
this.seed = seed;
this.nTokens = nTokens;
this.nPerSleep = nPerSleep;
if (nTokens <= 0) {
log.warn("nTokens parameter " + nTokens + " ignored; must be greater than or equal to 1");
nTokens = 1;
}
if (nPerSleep <= 0) {
log.warn("nPerSleep parameter " + nPerSleep + " ignored; must be greater than or equal to 1");
nPerSleep = 1;
}
log.info("maxGeneratedRows=" + maxGeneratedRows);
log.info("seed=" + ((seed == 0L) ? "random value" : seed));
log.info("nTokens=" + nTokens);
log.info("nPerSleep=" + nPerSleep);
double dmsec = (double) delayMsec + ((double) this.delayNsec) / 1000000.;
if (dmsec > 0.0) {
log.info("sleep period=" + dmsec + "msec");
log.info(
"approximate max rate of record generation=" + (nPerSleep * 1000. / dmsec) + "/sec" +
" or " + (60. * nPerSleep * 1000. / dmsec) + "/minute"
);
} else {
log.info("sleep period= NONE");
log.info("approximate max rate of record generation= as fast as possible");
}
}
@Override
public Firehose connect(InputRowParser parser) throws IOException
{
final LinkedList<String> dimensions = new LinkedList<String>();
dimensions.add("inColumn");
dimensions.add("target");
return new Firehose()
{
private final java.util.Random rand = (seed == 0L) ? new Random() : new Random(seed);
private long rowCount = 0L;
private boolean waitIfmaxGeneratedRows = true;
@Override
public boolean hasMore()
{
if (maxGeneratedRows >= 0 && rowCount >= maxGeneratedRows) {
return waitIfmaxGeneratedRows;
} else {
return true; // there are always more random numbers
}
}
@Override
public InputRow nextRow()
{
final long modulus = rowCount % nPerSleep;
final long nth = (rowCount % nTokens) + 1;
long sleepMsec = delayMsec;
// all done?
if (maxGeneratedRows >= 0 && rowCount >= maxGeneratedRows && waitIfmaxGeneratedRows) {
// sleep a long time instead of terminating
sleepMsec = 2000000000L;
}
if (sleepMsec > 0L || delayNsec > 0) {
try {
if (modulus == 0) {
sleep(sleepMsec, delayNsec);
}
}
catch (InterruptedException e) {
throw new RuntimeException("InterruptedException");
}
}
if (++rowCount % 1000 == 0) {
log.info("%,d events created.", rowCount);
}
final Map<String, Object> theMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
theMap.put("inColumn", anotherRand((int) nth));
theMap.put("target", ("a" + nth));
return new MapBasedInputRow(System.currentTimeMillis(), dimensions, theMap);
}
private Float anotherRand(int scale)
{
double f = rand.nextDouble(); // [0.0,1.0]
return new Float(f + (double) scale);
}
@Override
public Runnable commit()
{
// Do nothing.
return new Runnable()
{
@Override
public void run()
{
}
};
}
@Override
public void close() throws IOException
{
// do nothing
}
};
}
@Override
public InputRowParser getParser()
{
return null;
}
}

View File

@ -311,9 +311,4 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory<InputRowP
};
}
@Override
public InputRowParser getParser()
{
return null;
}
}

View File

@ -1,133 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.examples.web;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.InputSupplier;
import com.metamx.emitter.EmittingLogger;
import io.druid.jackson.DefaultObjectMapper;
import java.io.BufferedReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class InputSupplierUpdateStream implements UpdateStream
{
private static final EmittingLogger log = new EmittingLogger(InputSupplierUpdateStream.class);
private static final long queueWaitTime = 15L;
private final TypeReference<HashMap<String, Object>> typeRef;
private final InputSupplier<BufferedReader> supplier;
private final int QUEUE_SIZE = 10000;
private final BlockingQueue<Map<String, Object>> queue = new ArrayBlockingQueue<Map<String, Object>>(QUEUE_SIZE);
private final ObjectMapper mapper = new DefaultObjectMapper();
private final String timeDimension;
private final Thread addToQueueThread;
public InputSupplierUpdateStream(
final InputSupplier<BufferedReader> supplier,
final String timeDimension
)
{
addToQueueThread = new Thread()
{
public void run()
{
while (!isInterrupted()) {
try {
BufferedReader reader = supplier.getInput();
String line;
while ((line = reader.readLine()) != null) {
if (isValid(line)) {
HashMap<String, Object> map = mapper.readValue(line, typeRef);
if (map.get(timeDimension) != null) {
queue.offer(map, queueWaitTime, TimeUnit.SECONDS);
log.debug("Successfully added to queue");
} else {
log.info("missing timestamp");
}
}
}
}
catch (InterruptedException e){
log.info(e, "Thread adding events to the queue interrupted");
return;
}
catch (JsonMappingException e) {
log.info(e, "Error in converting json to map");
}
catch (JsonParseException e) {
log.info(e, "Error in parsing json");
}
catch (IOException e) {
log.info(e, "Error in connecting to InputStream");
}
}
}
};
addToQueueThread.setDaemon(true);
this.supplier = supplier;
this.typeRef = new TypeReference<HashMap<String, Object>>()
{
};
this.timeDimension = timeDimension;
}
private boolean isValid(String s)
{
return !(s.isEmpty());
}
public void start()
{
addToQueueThread.start();
}
public void stop()
{
addToQueueThread.interrupt();
}
public Map<String, Object> pollFromQueue(long waitTime, TimeUnit unit) throws InterruptedException
{
return queue.poll(waitTime, unit);
}
public int getQueueSize()
{
return queue.size();
}
public String getTimeDimension()
{
return timeDimension;
}
}

View File

@ -1,42 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.examples.web;
import com.google.common.io.InputSupplier;
import java.io.BufferedReader;
public class InputSupplierUpdateStreamFactory implements UpdateStreamFactory
{
private final InputSupplier<BufferedReader> inputSupplier;
private final String timeDimension;
public InputSupplierUpdateStreamFactory(InputSupplier<BufferedReader> inputSupplier, String timeDimension)
{
this.inputSupplier = inputSupplier;
this.timeDimension = timeDimension;
}
public InputSupplierUpdateStream build()
{
return new InputSupplierUpdateStream(inputSupplier, timeDimension);
}
}

View File

@ -1,81 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.examples.web;
import com.google.common.collect.Maps;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class RenamingKeysUpdateStream implements UpdateStream
{
private final InputSupplierUpdateStream updateStream;
private Map<String, String> renamedDimensions;
public RenamingKeysUpdateStream(
InputSupplierUpdateStream updateStream,
Map<String, String> renamedDimensions
)
{
this.renamedDimensions = renamedDimensions;
this.updateStream = updateStream;
}
public Map<String, Object> pollFromQueue(long waitTime, TimeUnit unit) throws InterruptedException
{
return renameKeys(updateStream.pollFromQueue(waitTime, unit));
}
private Map<String, Object> renameKeys(Map<String, Object> update)
{
if (renamedDimensions != null) {
Map<String, Object> renamedMap = Maps.newHashMap();
for (String key : renamedDimensions.keySet()) {
if (update.get(key) != null) {
Object obj = update.get(key);
renamedMap.put(renamedDimensions.get(key), obj);
}
}
return renamedMap;
} else {
return update;
}
}
public String getTimeDimension()
{
if (renamedDimensions != null && renamedDimensions.get(updateStream.getTimeDimension()) != null) {
return renamedDimensions.get(updateStream.getTimeDimension());
}
return updateStream.getTimeDimension();
}
public void start()
{
updateStream.start();
}
public void stop(){
updateStream.stop();
}
}

View File

@ -1,39 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.examples.web;
import java.util.Map;
public class RenamingKeysUpdateStreamFactory implements UpdateStreamFactory
{
private InputSupplierUpdateStreamFactory updateStreamFactory;
private Map<String, String> renamedDimensions;
public RenamingKeysUpdateStreamFactory(InputSupplierUpdateStreamFactory updateStreamFactory, Map<String, String> renamedDimensions)
{
this.updateStreamFactory = updateStreamFactory;
this.renamedDimensions = renamedDimensions;
}
public RenamingKeysUpdateStream build()
{
return new RenamingKeysUpdateStream(updateStreamFactory.build(), renamedDimensions);
}
}

View File

@ -1,24 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.examples.web;
public interface UpdateStreamFactory
{
public UpdateStream build();
}

View File

@ -1,142 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.examples.web;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Throwables;
import com.metamx.common.parsers.TimestampParser;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.utils.Runnables;
import org.joda.time.DateTime;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@JsonTypeName("webstream")
public class WebFirehoseFactory implements FirehoseFactory<InputRowParser>
{
private static final EmittingLogger log = new EmittingLogger(WebFirehoseFactory.class);
private final String timeFormat;
private final UpdateStreamFactory factory;
private final long queueWaitTime = 15L;
@JsonCreator
public WebFirehoseFactory(
@JsonProperty("url") String url,
@JsonProperty("renamedDimensions") Map<String, String> renamedDimensions,
@JsonProperty("timeDimension") String timeDimension,
@JsonProperty("timeFormat") String timeFormat
)
{
this(
new RenamingKeysUpdateStreamFactory(
new InputSupplierUpdateStreamFactory(new WebJsonSupplier(url), timeDimension),
renamedDimensions
), timeFormat
);
}
public WebFirehoseFactory(UpdateStreamFactory factory, String timeFormat)
{
this.factory = factory;
if (timeFormat == null) {
this.timeFormat = "auto";
} else {
this.timeFormat = timeFormat;
}
}
@Override
public Firehose connect(InputRowParser parser) throws IOException
{
final UpdateStream updateStream = factory.build();
updateStream.start();
return new Firehose()
{
Map<String, Object> map;
private final Runnable doNothingRunnable = Runnables.getNoopRunnable();
@Override
public boolean hasMore()
{
try {
map = updateStream.pollFromQueue(queueWaitTime, TimeUnit.SECONDS);
return map != null;
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
@Override
public InputRow nextRow()
{
try {
DateTime date = TimestampParser.createTimestampParser(timeFormat)
.apply(map.get(updateStream.getTimeDimension()).toString());
return new MapBasedInputRow(
date.getMillis(),
new ArrayList(map.keySet()),
map
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
map = null;
}
}
@Override
public Runnable commit()
{
// ephemera in, ephemera out.
return doNothingRunnable; // reuse the same object each time
}
@Override
public void close() throws IOException
{
updateStream.stop();
}
};
}
@Override
public InputRowParser getParser()
{
return null;
}
}

View File

@ -1,61 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.examples.web;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.io.InputSupplier;
import com.metamx.emitter.EmittingLogger;
import org.apache.commons.validator.routines.UrlValidator;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLConnection;
public class WebJsonSupplier implements InputSupplier<BufferedReader>
{
private static final EmittingLogger log = new EmittingLogger(WebJsonSupplier.class);
private static final UrlValidator urlValidator = new UrlValidator();
private URL url;
public WebJsonSupplier(String urlString)
{
Preconditions.checkState(urlValidator.isValid(urlString));
try {
this.url = new URL(urlString);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
@Override
public BufferedReader getInput() throws IOException
{
URLConnection connection = url.openConnection();
connection.setDoInput(true);
return new BufferedReader(new InputStreamReader(url.openStream(), Charsets.UTF_8));
}
}

View File

@ -1,114 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.examples.web;
import com.google.common.io.InputSupplier;
import junit.framework.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class InputSupplierUpdateStreamTest
{
private final long waitTime = 1L;
private final TimeUnit unit = TimeUnit.SECONDS;
private final ArrayList<String> dimensions = new ArrayList<String>();
private InputSupplier testCaseSupplier;
Map<String, Object> expectedAnswer = new HashMap<String, Object>();
String timeDimension;
@Before
public void setUp()
{
timeDimension = "time";
testCaseSupplier = new TestCaseSupplier(
"{\"item1\": \"value1\","
+ "\"item2\":2,"
+ "\"time\":1372121562 }"
);
dimensions.add("item1");
dimensions.add("item2");
dimensions.add("time");
expectedAnswer.put("item1", "value1");
expectedAnswer.put("item2", 2);
expectedAnswer.put("time", 1372121562);
}
@Test
public void basicIngestionCheck() throws Exception
{
InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(
testCaseSupplier,
timeDimension
);
updateStream.start();
Map<String, Object> insertedRow = updateStream.pollFromQueue(waitTime, unit);
Assert.assertEquals(expectedAnswer, insertedRow);
updateStream.stop();
}
//If a timestamp is missing, we should throw away the event
@Test
public void missingTimeStampCheck()
{
testCaseSupplier = new TestCaseSupplier(
"{\"item1\": \"value1\","
+ "\"item2\":2}"
);
InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(
testCaseSupplier,
timeDimension
);
updateStream.start();
Assert.assertEquals(updateStream.getQueueSize(), 0);
updateStream.stop();
}
//If any other value is missing, we should still add the event and process it properly
@Test
public void otherNullValueCheck() throws Exception
{
testCaseSupplier = new TestCaseSupplier(
"{\"item1\": \"value1\","
+ "\"time\":1372121562 }"
);
InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(
testCaseSupplier,
timeDimension
);
updateStream.start();
Map<String, Object> insertedRow = updateStream.pollFromQueue(waitTime, unit);
Map<String, Object> expectedAnswer = new HashMap<String, Object>();
expectedAnswer.put("item1", "value1");
expectedAnswer.put("time", 1372121562);
Assert.assertEquals(expectedAnswer, insertedRow);
updateStream.stop();
}
}

View File

@ -1,91 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.examples.web;
import com.google.common.io.InputSupplier;
import junit.framework.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class RenamingKeysUpdateStreamTest
{
private final long waitTime = 15L;
private final TimeUnit unit = TimeUnit.SECONDS;
private InputSupplier testCaseSupplier;
String timeDimension;
@Before
public void setUp()
{
timeDimension = "time";
testCaseSupplier = new TestCaseSupplier(
"{\"item1\": \"value1\","
+ "\"item2\":2,"
+ "\"time\":1372121562 }"
);
}
@Test
public void testPolFromQueue() throws Exception
{
InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(testCaseSupplier, timeDimension);
Map<String, String> renamedKeys = new HashMap<String, String>();
renamedKeys.put("item1", "i1");
renamedKeys.put("item2", "i2");
renamedKeys.put("time", "t");
RenamingKeysUpdateStream renamer = new RenamingKeysUpdateStream(updateStream, renamedKeys);
renamer.start();
Map<String, Object> expectedAnswer = new HashMap<String, Object>();
expectedAnswer.put("i1", "value1");
expectedAnswer.put("i2", 2);
expectedAnswer.put("t", 1372121562);
Assert.assertEquals(expectedAnswer, renamer.pollFromQueue(waitTime, unit));
}
@Test
public void testGetTimeDimension() throws Exception
{
InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(testCaseSupplier, timeDimension);
Map<String, String> renamedKeys = new HashMap<String, String>();
renamedKeys.put("item1", "i1");
renamedKeys.put("item2", "i2");
renamedKeys.put("time", "t");
RenamingKeysUpdateStream renamer = new RenamingKeysUpdateStream(updateStream, renamedKeys);
Assert.assertEquals("t", renamer.getTimeDimension());
}
@Test
public void testMissingTimeRename() throws Exception
{
InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(testCaseSupplier, timeDimension);
Map<String, String> renamedKeys = new HashMap<String, String>();
renamedKeys.put("item1", "i1");
renamedKeys.put("item2", "i2");
RenamingKeysUpdateStream renamer = new RenamingKeysUpdateStream(updateStream, renamedKeys);
Assert.assertEquals("time", renamer.getTimeDimension());
}
}

View File

@ -1,223 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.examples.web;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.druid.data.input.Firehose;
import io.druid.data.input.InputRow;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class WebFirehoseFactoryTest
{
private List<String> dimensions = Lists.newArrayList();
private WebFirehoseFactory webbie;
private WebFirehoseFactory webbie1;
@Before
public void setUp() throws Exception
{
dimensions.add("item1");
dimensions.add("item2");
dimensions.add("time");
webbie = new WebFirehoseFactory(
new UpdateStreamFactory()
{
@Override
public UpdateStream build()
{
return new MyUpdateStream(ImmutableMap.<String,Object>of("item1", "value1", "item2", 2, "time", "1372121562"));
}
},
"posix"
);
webbie1 = new WebFirehoseFactory(
new UpdateStreamFactory()
{
@Override
public UpdateStream build()
{
return new MyUpdateStream(ImmutableMap.<String,Object>of("item1", "value1", "item2", 2, "time", "1373241600000"));
}
},
"auto"
);
}
@Test
public void testDimensions() throws Exception
{
InputRow inputRow;
Firehose firehose = webbie.connect(null);
if (firehose.hasMore()) {
inputRow = firehose.nextRow();
} else {
throw new RuntimeException("queue is empty");
}
List<String> actualAnswer = inputRow.getDimensions();
Collections.sort(actualAnswer);
Assert.assertEquals(actualAnswer, dimensions);
}
@Test
public void testPosixTimeStamp() throws Exception
{
InputRow inputRow;
Firehose firehose = webbie.connect(null);
if (firehose.hasMore()) {
inputRow = firehose.nextRow();
} else {
throw new RuntimeException("queue is empty");
}
long expectedTime = 1372121562L * 1000L;
Assert.assertEquals(expectedTime, inputRow.getTimestampFromEpoch());
}
@Test
public void testISOTimeStamp() throws Exception
{
WebFirehoseFactory webbie3 = new WebFirehoseFactory(
new UpdateStreamFactory()
{
@Override
public UpdateStream build()
{
return new MyUpdateStream(ImmutableMap.<String,Object>of("item1", "value1", "item2", 2, "time", "2013-07-08"));
}
},
"auto"
);
Firehose firehose1 = webbie3.connect(null);
if (firehose1.hasMore()) {
long milliSeconds = firehose1.nextRow().getTimestampFromEpoch();
DateTime date = new DateTime("2013-07-08");
Assert.assertEquals(date.getMillis(), milliSeconds);
} else {
Assert.assertFalse("hasMore returned false", true);
}
}
@Test
public void testAutoIsoTimeStamp() throws Exception
{
WebFirehoseFactory webbie2 = new WebFirehoseFactory(
new UpdateStreamFactory()
{
@Override
public UpdateStream build()
{
return new MyUpdateStream(ImmutableMap.<String,Object>of("item1", "value1", "item2", 2, "time", "2013-07-08"));
}
},
null
);
Firehose firehose2 = webbie2.connect(null);
if (firehose2.hasMore()) {
long milliSeconds = firehose2.nextRow().getTimestampFromEpoch();
DateTime date = new DateTime("2013-07-08");
Assert.assertEquals(date.getMillis(), milliSeconds);
} else {
Assert.assertFalse("hasMore returned false", true);
}
}
@Test
public void testAutoMilliSecondsTimeStamp() throws Exception
{
Firehose firehose3 = webbie1.connect(null);
if (firehose3.hasMore()) {
long milliSeconds = firehose3.nextRow().getTimestampFromEpoch();
DateTime date = new DateTime("2013-07-08");
Assert.assertEquals(date.getMillis(), milliSeconds);
} else {
Assert.assertFalse("hasMore returned false", true);
}
}
@Test
public void testGetDimension() throws Exception
{
InputRow inputRow;
Firehose firehose = webbie1.connect(null);
if (firehose.hasMore()) {
inputRow = firehose.nextRow();
} else {
throw new RuntimeException("queue is empty");
}
List<String> column1 = Lists.newArrayList();
column1.add("value1");
Assert.assertEquals(column1, inputRow.getDimension("item1"));
}
@Test
public void testGetFloatMetric() throws Exception
{
InputRow inputRow;
Firehose firehose = webbie1.connect(null);
if (firehose.hasMore()) {
inputRow = firehose.nextRow();
} else {
throw new RuntimeException("queue is empty");
}
Assert.assertEquals((float) 2.0, inputRow.getFloatMetric("item2"), 0.0f);
}
private static class MyUpdateStream implements UpdateStream
{
private static ImmutableMap<String,Object> map;
public MyUpdateStream(ImmutableMap<String,Object> map){
this.map=map;
}
@Override
public Map<String, Object> pollFromQueue(long waitTime, TimeUnit unit) throws InterruptedException
{
return map;
}
@Override
public String getTimeDimension()
{
return "time";
}
@Override
public void start()
{
}
@Override
public void stop()
{
}
}
}

View File

@ -1,32 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.examples.web;
import org.junit.Test;
public class WebJsonSupplierTest
{
@Test(expected = IllegalStateException.class)
public void checkInvalidUrl() throws Exception
{
String invalidURL = "http://invalid.url.";
WebJsonSupplier supplier = new WebJsonSupplier(invalidURL);
}
}

View File

@ -0,0 +1,86 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Druid - a distributed column store.
~ Copyright (C) 2012, 2013 Metamarkets Group Inc.
~
~ This program is free software; you can redistribute it and/or
~ modify it under the terms of the GNU General Public License
~ as published by the Free Software Foundation; either version 2
~ of the License, or (at your option) any later version.
~
~ This program is distributed in the hope that it will be useful,
~ but WITHOUT ANY WARRANTY; without even the implied warranty of
~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
~ GNU General Public License for more details.
~
~ You should have received a copy of the GNU General Public License
~ along with this program; if not, write to the Free Software
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-cassandra-storage</artifactId>
<name>druid-cassandra-storage</name>
<description>druid-cassandra-storage</description>
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.7.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
</dependency>
<dependency>
<groupId>com.netflix.astyanax</groupId>
<artifactId>astyanax</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
<addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -97,24 +97,4 @@ public class CassandraDataSegmentPuller extends CassandraStorage implements Data
log.info("Pull of file[%s] completed in %,d millis (%s bytes)", key, System.currentTimeMillis() - startTime,
meta.getObjectSize());
}
@Override
public long getLastModified(DataSegment segment) throws SegmentLoadingException
{
String key = (String) segment.getLoadSpec().get("key");
OperationResult<ColumnList<String>> result;
try
{
result = this.keyspace.prepareQuery(descriptorStorage)
.getKey(key)
.execute();
ColumnList<String> children = result.getResult();
long lastModified = children.getColumnByName("lastmodified").getLongValue();
log.info("Read lastModified for [%s] as [%d]", key, lastModified);
return lastModified;
} catch (ConnectionException e)
{
throw new SegmentLoadingException(e, e.getMessage());
}
}
}

View File

@ -0,0 +1,104 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Druid - a distributed column store.
~ Copyright (C) 2012, 2013 Metamarkets Group Inc.
~
~ This program is free software; you can redistribute it and/or
~ modify it under the terms of the GNU General Public License
~ as published by the Free Software Foundation; either version 2
~ of the License, or (at your option) any later version.
~
~ This program is distributed in the hope that it will be useful,
~ but WITHOUT ANY WARRANTY; without even the implied warranty of
~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
~ GNU General Public License for more details.
~
~ You should have received a copy of the GNU General Public License
~ along with this program; if not, write to the Free Software
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-hdfs-storage</artifactId>
<name>druid-hdfs-storage</name>
<description>druid-hdfs-storage</description>
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.7.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
</dependency>
<!-- override jets3t from hadoop-core -->
<dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
</dependency>
<!-- override httpclient / httpcore version from jets3t -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>emitter</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
<addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -65,20 +65,6 @@ public class HdfsDataSegmentPuller implements DataSegmentPuller
}
}
@Override
public long getLastModified(DataSegment segment) throws SegmentLoadingException
{
Path path = getPath(segment);
FileSystem fs = checkPathAndGetFilesystem(path);
try {
return fs.getFileStatus(path).getModificationTime();
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Problem loading status of path[%s]", path);
}
}
private Path getPath(DataSegment segment)
{
return new Path(String.valueOf(segment.getLoadSpec().get("path")));

View File

@ -19,6 +19,7 @@
package io.druid.storage.hdfs.tasklog;
import com.google.common.base.Optional;
import com.google.common.io.ByteSource;
import com.google.common.io.InputSupplier;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
@ -61,16 +62,16 @@ public class HdfsTaskLogs implements TaskLogs
}
@Override
public Optional<InputSupplier<InputStream>> streamTaskLog(final String taskId, final long offset) throws IOException
public Optional<ByteSource> streamTaskLog(final String taskId, final long offset) throws IOException
{
final Path path = getTaskLogFileFromId(taskId);
final FileSystem fs = FileSystem.get(new Configuration());
if (fs.exists(path)) {
return Optional.<InputSupplier<InputStream>>of(
new InputSupplier<InputStream>()
return Optional.<ByteSource>of(
new ByteSource()
{
@Override
public InputStream getInput() throws IOException
public InputStream openStream() throws IOException
{
log.info("Reading task log from: %s", path);
final long seekPos;

View File

@ -0,0 +1,84 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Druid - a distributed column store.
~ Copyright (C) 2012, 2013 Metamarkets Group Inc.
~
~ This program is free software; you can redistribute it and/or
~ modify it under the terms of the GNU General Public License
~ as published by the Free Software Foundation; either version 2
~ of the License, or (at your option) any later version.
~
~ This program is distributed in the hope that it will be useful,
~ but WITHOUT ANY WARRANTY; without even the implied warranty of
~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
~ GNU General Public License for more details.
~
~ You should have received a copy of the GNU General Public License
~ along with this program; if not, write to the Free Software
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-histogram</artifactId>
<name>druid-histogram</name>
<description>druid-histogram</description>
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.7.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
</dependency>
<!-- Tests -->
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
<addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -89,6 +89,12 @@ public class ApproximateHistogramAggregator implements Aggregator
throw new UnsupportedOperationException("ApproximateHistogramAggregator does not support getFloat()");
}
@Override
public long getLong()
{
throw new UnsupportedOperationException("ApproximateHistogramAggregator does not support getLong()");
}
@Override
public String getName()
{

View File

@ -70,7 +70,7 @@ public class ApproximateHistogramAggregatorFactory implements AggregatorFactory
)
{
this.name = name;
this.fieldName = fieldName.toLowerCase();
this.fieldName = fieldName;
this.resolution = resolution == null ? ApproximateHistogram.DEFAULT_HISTOGRAM_SIZE : resolution;
this.numBuckets = numBuckets == null ? ApproximateHistogram.DEFAULT_BUCKET_SIZE : numBuckets;
this.lowerLimit = lowerLimit == null ? Float.NEGATIVE_INFINITY : lowerLimit;

View File

@ -84,7 +84,13 @@ public class ApproximateHistogramFoldingAggregator implements Aggregator
@Override
public float getFloat()
{
throw new UnsupportedOperationException("ApproximateHistogramAggregator does not support getFloat()");
throw new UnsupportedOperationException("ApproximateHistogramFoldingAggregator does not support getFloat()");
}
@Override
public long getLong()
{
throw new UnsupportedOperationException("ApproximateHistogramFoldingAggregator does not support getLong()");
}
@Override

Some files were not shown because too many files have changed in this diff Show More