Merge remote-tracking branch 'origin/master' into druid-0.7.x

Conflicts:
	cassandra-storage/pom.xml
	common/pom.xml
	examples/config/historical/runtime.properties
	examples/config/overlord/runtime.properties
	examples/config/realtime/runtime.properties
	examples/pom.xml
	hdfs-storage/pom.xml
	histogram/pom.xml
	indexing-hadoop/pom.xml
	indexing-service/pom.xml
	kafka-eight/pom.xml
	kafka-seven/pom.xml
	pom.xml
	processing/pom.xml
	rabbitmq/pom.xml
	s3-extensions/pom.xml
	server/pom.xml
	server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java
	services/pom.xml
This commit is contained in:
Xavier Léauté 2014-08-30 22:12:05 -07:00
commit 508e982190
44 changed files with 1147 additions and 214 deletions

View File

@ -0,0 +1,44 @@
---
layout: doc_page
---
## Introduction
Druid can use Cassandra as a deep storage mechanism. Segments and their metadata are stored in Cassandra in two tables:
`index_storage` and `descriptor_storage`. Underneath the hood, the Cassandra integration leverages Astyanax. The
index storage table is a [Chunked Object](https://github.com/Netflix/astyanax/wiki/Chunked-Object-Store) repository. It contains
compressed segments for distribution to historical nodes. Since segments can be large, the Chunked Object storage allows the integration to multi-thread
the write to Cassandra, and spreads the data across all the nodes in a cluster. The descriptor storage table is a normal C* table that
stores the segment metadatak.
## Schema
Below are the create statements for each:
```sql
CREATE TABLE index_storage(key text,
chunk text,
value blob,
PRIMARY KEY (key, chunk)) WITH COMPACT STORAGE;
CREATE TABLE descriptor_storage(key varchar,
lastModified timestamp,
descriptor varchar,
PRIMARY KEY (key)) WITH COMPACT STORAGE;
```
## Getting Started
First create the schema above. I use a new keyspace called `druid` for this purpose, which can be created using the
[Cassandra CQL `CREATE KEYSPACE`](http://www.datastax.com/documentation/cql/3.1/cql/cql_reference/create_keyspace_r.html) command.
Then, add the following to your historical and realtime runtime properties files to enable a Cassandra backend.
```properties
druid.extensions.coordinates=["io.druid.extensions:druid-cassandra-storage:<druid version>"]
druid.storage.type=c*
druid.storage.host=localhost:9160
druid.storage.keyspace=druid
```
Use the `druid-development@googlegroups.com` mailing list if you have questions,
or feel free to reach out directly: `bone@alumni.brown.edu`.

View File

@ -117,7 +117,7 @@ The Druid coordinator exposes a web GUI for displaying cluster information and r
http://<COORDINATOR_IP>:<COORDINATOR_PORT>
```
There exists a full cluster view, as well as views for individual historical nodes, datasources and segments themselves. Segment information can be displayed in raw JSON form or as part of a sortable and filterable table.
There exists a full cluster view (which shows only the realtime and historical nodes), as well as views for individual historical nodes, datasources and segments themselves. Segment information can be displayed in raw JSON form or as part of a sortable and filterable table.
The coordinator console also exposes an interface to creating and editing rules. All valid datasources configured in the segment database, along with a default datasource, are available for configuration. Rules of different types can be added, deleted or edited.

View File

@ -47,3 +47,11 @@ druid.storage.storageDirectory=<directory for storing segments>
Note that you should generally set `druid.storage.storageDirectory` to something different from `druid.segmentCache.locations` and `druid.segmentCache.infoDir`.
If you are using the Hadoop indexer in local mode, then just give it a local file as your output directory and it will work.
## Cassandra
[Apache Cassandra](http://www.datastax.com/what-we-offer/products-services/datastax-enterprise/apache-cassandra) can also be leveraged for deep storage. This requires some additional druid configuration as well as setting up the necessary schema within a Cassandra keystore.
For more information on using Cassandra as deep storage, see [Cassandra Deep Storage](Cassandra-Deep-Storage.html).

View File

@ -19,13 +19,13 @@ Clone Druid and build it:
git clone https://github.com/metamx/druid.git druid
cd druid
git fetch --tags
git checkout druid-0.6.144
git checkout druid-0.6.147
./build.sh
```
### Downloading the DSK (Druid Standalone Kit)
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.144-bin.tar.gz) a stand-alone tarball and run it:
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.147-bin.tar.gz) a stand-alone tarball and run it:
``` bash
tar -xzf druid-services-0.X.X-bin.tar.gz

View File

@ -55,7 +55,7 @@ There are 11 main parts to a groupBy query:
|granularity|Defines the granularity of the query. See [Granularities](Granularities.html)|yes|
|filter|See [Filters](Filters.html)|no|
|aggregations|See [Aggregations](Aggregations.html)|yes|
|postAggregations|See [Post Aggregations](Post-Aggregations.html)|no|
|postAggregations|See [Post Aggregations](Post-aggregations.html)|no|
|intervals|A JSON Object representing ISO-8601 Intervals. This defines the time ranges to run the query over.|yes|
|context|An additional JSON Object which can be used to specify certain flags.|no|
@ -87,4 +87,4 @@ To pull it all together, the above query would return *n\*m* data points, up to
},
...
]
```
```

View File

@ -8,9 +8,9 @@ The previous examples are for Kafka 7. To support Kafka 8, a couple changes need
- Update realtime node's configs for Kafka 8 extensions
- e.g.
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-seven:0.6.144",...]`
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-seven:0.6.147",...]`
- becomes
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-eight:0.6.144",...]`
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-eight:0.6.147",...]`
- Update realtime task config for changed keys
- `firehose.type`, `plumber.rejectionPolicyFactory`, and all of `firehose.consumerProps` changes.

View File

@ -57,7 +57,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/overlord
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.144"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.147"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod
@ -139,7 +139,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/middlemanager
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.144","io.druid.extensions:druid-kafka-seven:0.6.144"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.147","io.druid.extensions:druid-kafka-seven:0.6.147"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod
@ -286,7 +286,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/historical
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.144"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.147"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod

View File

@ -27,7 +27,7 @@ druid.host=localhost
druid.service=realtime
druid.port=8083
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.144"]
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.147"]
druid.zk.service.host=localhost
@ -76,7 +76,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/realtime
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.144","io.druid.extensions:druid-kafka-seven:0.6.144"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.147","io.druid.extensions:druid-kafka-seven:0.6.147"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod

View File

@ -28,7 +28,7 @@ Configuration:
-Ddruid.zk.service.host=localhost
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.144"]
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.147"]
-Ddruid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
-Ddruid.db.connector.user=druid

View File

@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
### Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.144-bin.tar.gz). Download this file to a directory of your choosing.
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.147-bin.tar.gz). Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
cd druid-services-0.6.144
cd druid-services-0.6.147
```
You should see a bunch of files:

View File

@ -91,7 +91,7 @@ druid.service=overlord
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.144"]
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.147"]
druid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
druid.db.connector.user=druid

View File

@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes and external depende
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.144-bin.tar.gz)
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.147-bin.tar.gz)
and untar the contents within by issuing:
@ -149,7 +149,7 @@ druid.port=8081
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.144"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.147"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
@ -240,7 +240,7 @@ druid.port=8083
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.144","io.druid.extensions:druid-kafka-seven:0.6.144"]
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.147","io.druid.extensions:druid-kafka-seven:0.6.147"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop

View File

@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
h3. Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.144-bin.tar.gz)
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.147-bin.tar.gz)
Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
cd druid-services-0.6.144
cd druid-services-0.6.147
```
You should see a bunch of files:

View File

@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source.
# Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.144-bin.tar.gz).
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.147-bin.tar.gz).
Download this bad boy to a directory of your choosing.
You can extract the awesomeness within by issuing:

View File

@ -1,32 +1,5 @@
## Introduction
Druid can use Cassandra as a deep storage mechanism. Segments and their metadata are stored in Cassandra in two tables:
`index_storage` and `descriptor_storage`. Underneath the hood, the Cassandra integration leverages Astyanax. The
index storage table is a [Chunked Object](https://github.com/Netflix/astyanax/wiki/Chunked-Object-Store) repository. It contains
compressed segments for distribution to historical nodes. Since segments can be large, the Chunked Object storage allows the integration to multi-thread
the write to Cassandra, and spreads the data across all the nodes in a cluster. The descriptor storage table is a normal C* table that
stores the segment metadatak.
## Schema
Below are the create statements for each:
CREATE TABLE index_storage ( key text, chunk text, value blob, PRIMARY KEY (key, chunk)) WITH COMPACT STORAGE;
CREATE TABLE descriptor_storage ( key varchar, lastModified timestamp, descriptor varchar, PRIMARY KEY (key) ) WITH COMPACT STORAGE;
## Getting Started
First create the schema above. (I use a new keyspace called `druid`)
Then, add the following properties to your properties file to enable a Cassandra
backend.
druid.storage.cassandra=true
druid.storage.cassandra.host=localhost:9160
druid.storage.cassandra.keyspace=druid
Use the `druid-development@googlegroups.com` mailing list if you have questions,
or feel free to reach out directly: `bone@alumni.brown.edu`.
## Example Prerequisite
The code in this example assumes Cassandra has been configured as deep storage for Druid.
For details on how to accomplish this, see [Cassandra Deep Storage](../../docs/content/Cassandra-Deep-Storage.md).

View File

@ -238,6 +238,52 @@ public class ApproximateHistogramAggregatorFactory implements AggregatorFactory
return new ApproximateHistogram(resolution);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ApproximateHistogramAggregatorFactory that = (ApproximateHistogramAggregatorFactory) o;
if (Float.compare(that.lowerLimit, lowerLimit) != 0) {
return false;
}
if (numBuckets != that.numBuckets) {
return false;
}
if (resolution != that.resolution) {
return false;
}
if (Float.compare(that.upperLimit, upperLimit) != 0) {
return false;
}
if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) {
return false;
}
if (name != null ? !name.equals(that.name) : that.name != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = name != null ? name.hashCode() : 0;
result = 31 * result + (fieldName != null ? fieldName.hashCode() : 0);
result = 31 * result + resolution;
result = 31 * result + numBuckets;
result = 31 * result + (lowerLimit != +0.0f ? Float.floatToIntBits(lowerLimit) : 0);
result = 31 * result + (upperLimit != +0.0f ? Float.floatToIntBits(upperLimit) : 0);
return result;
}
@Override
public String toString()
{

View File

@ -148,6 +148,52 @@ public class ApproximateHistogramFoldingAggregatorFactory extends ApproximateHis
.array();
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ApproximateHistogramAggregatorFactory that = (ApproximateHistogramAggregatorFactory) o;
if (Float.compare(that.lowerLimit, lowerLimit) != 0) {
return false;
}
if (numBuckets != that.numBuckets) {
return false;
}
if (resolution != that.resolution) {
return false;
}
if (Float.compare(that.upperLimit, upperLimit) != 0) {
return false;
}
if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) {
return false;
}
if (name != null ? !name.equals(that.name) : that.name != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = name != null ? name.hashCode() : 0;
result = 31 * result + (fieldName != null ? fieldName.hashCode() : 0);
result = 31 * result + resolution;
result = 31 * result + numBuckets;
result = 31 * result + (lowerLimit != +0.0f ? Float.floatToIntBits(lowerLimit) : 0);
result = 31 * result + (upperLimit != +0.0f ? Float.floatToIntBits(upperLimit) : 0);
return result;
}
@Override
public String toString()
{

View File

@ -107,6 +107,12 @@ public abstract class AbstractTask implements Task
return null;
}
@Override
public String getClasspathPrefix()
{
return null;
}
@Override
public String toString()
{

View File

@ -79,6 +79,8 @@ public class HadoopIndexTask extends AbstractTask
private final HadoopIngestionSpec spec;
@JsonIgnore
private final List<String> hadoopDependencyCoordinates;
@JsonIgnore
private final String classpathPrefix;
/**
* @param spec is used by the HadoopDruidIndexerJob to set up the appropriate parameters
@ -96,7 +98,8 @@ public class HadoopIndexTask extends AbstractTask
@JsonProperty("spec") HadoopIngestionSpec spec,
@JsonProperty("config") HadoopIngestionSpec config, // backwards compat
@JsonProperty("hadoopCoordinates") String hadoopCoordinates,
@JsonProperty("hadoopDependencyCoordinates") List<String> hadoopDependencyCoordinates
@JsonProperty("hadoopDependencyCoordinates") List<String> hadoopDependencyCoordinates,
@JsonProperty("classpathPrefix") String classpathPrefix
)
{
super(
@ -123,6 +126,8 @@ public class HadoopIndexTask extends AbstractTask
// Will be defaulted to something at runtime, based on taskConfig.
this.hadoopDependencyCoordinates = null;
}
this.classpathPrefix = classpathPrefix;
}
@Override
@ -159,6 +164,13 @@ public class HadoopIndexTask extends AbstractTask
return hadoopDependencyCoordinates;
}
@JsonProperty
@Override
public String getClasspathPrefix()
{
return classpathPrefix;
}
@SuppressWarnings("unchecked")
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception

View File

@ -98,6 +98,12 @@ public interface Task
*/
public <T> QueryRunner<T> getQueryRunner(Query<T> query);
/**
* Returns an extra classpath that should be prepended to the default classpath when running this task. If no
* extra classpath should be prepended, this should return null or the empty string.
*/
public String getClasspathPrefix();
/**
* Execute preflight actions for a task. This can be used to acquire locks, check preconditions, and so on. The
* actions must be idempotent, since this method may be executed multiple times. This typically runs on the

View File

@ -161,10 +161,19 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
final List<String> command = Lists.newArrayList();
final String childHost = String.format("%s:%d", node.getHostNoPort(), childPort);
final String taskClasspath;
if (task.getClasspathPrefix() != null && !task.getClasspathPrefix().isEmpty()) {
taskClasspath = Joiner.on(File.pathSeparator).join(
task.getClasspathPrefix(),
config.getClasspath()
);
} else {
taskClasspath = config.getClasspath();
}
command.add(config.getJavaCommand());
command.add("-cp");
command.add(config.getClasspath());
command.add(taskClasspath);
Iterables.addAll(command, whiteSpaceSplitter.split(config.getJavaOpts()));

View File

@ -427,7 +427,8 @@ public class TaskSerdeTest
null
),
null,
null
null,
"blah"
);
final String json = jsonMapper.writeValueAsString(task);
@ -442,5 +443,7 @@ public class TaskSerdeTest
task.getSpec().getTuningConfig().getJobProperties(),
task2.getSpec().getTuningConfig().getJobProperties()
);
Assert.assertEquals("blah", task.getClasspathPrefix());
Assert.assertEquals("blah", task2.getClasspathPrefix());
}
}

View File

@ -41,7 +41,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<metamx.java-util.version>0.26.6</metamx.java-util.version>
<apache.curator.version>2.6.0</apache.curator.version>
<druid.api.version>0.2.7</druid.api.version>
<druid.api.version>0.2.8</druid.api.version>
</properties>
<modules>
@ -568,6 +568,9 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-release-plugin</artifactId>
<version>2.5</version>
<configuration>
<autoVersionSubmodules>true</autoVersionSubmodules>
</configuration>
</plugin>
</plugins>
</pluginManagement>

View File

@ -70,10 +70,14 @@ public class DurationGranularity extends BaseQueryGranularity
}
@Override
public long truncate(long t)
public long truncate(final long t)
{
final long duration = getDurationMillis();
return t - t % duration + origin;
long offset = t % duration - origin % duration;
if(offset < 0) {
offset += duration;
}
return t - offset;
}
@Override

View File

@ -301,14 +301,17 @@ public class PeriodGranularity extends BaseQueryGranularity
return current;
}
private long truncateMillisPeriod(long t)
private long truncateMillisPeriod(final long t)
{
// toStandardDuration assumes days are always 24h, and hours are always 60 minutes,
// which may not always be the case, e.g if there are daylight saving changes.
if(chronology.days().isPrecise() && chronology.hours().isPrecise()) {
if (chronology.days().isPrecise() && chronology.hours().isPrecise()) {
final long millis = period.toStandardDuration().getMillis();
t -= t % millis + origin % millis;
return t;
long offset = t % millis - origin % millis;
if(offset < 0) {
offset += millis;
}
return t - offset;
}
else
{

View File

@ -0,0 +1,32 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "timewarp", value = TimewarpOperator.class)
})
public interface PostProcessingOperator<T>
{
public QueryRunner<T> postProcess(QueryRunner<T> baseQueryRunner);
}

View File

@ -0,0 +1,158 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.data.input.MapBasedRow;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.query.timeboundary.TimeBoundaryResultValue;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import java.util.Arrays;
/**
* TimewarpOperator is an example post-processing operator that maps current time
* to the latest period ending withing the specified data interval and truncates
* the query interval to discard data that would be mapped to the future.
*
*/
public class TimewarpOperator<T> implements PostProcessingOperator<T>
{
private final Interval dataInterval;
private final long periodMillis;
private final long originMillis;
/**
*
* @param dataInterval interval containing the actual data
* @param period time will be offset by a multiple of the given period
* until there is at least a full period ending within the data interval
* @param origin origin to be used to align time periods
* (e.g. to determine on what day of the week a weekly period starts)
*/
@JsonCreator
public TimewarpOperator(
@JsonProperty("dataInterval") Interval dataInterval,
@JsonProperty("period") Period period,
@JsonProperty("origin") DateTime origin
)
{
this.originMillis = origin.getMillis();
this.dataInterval = dataInterval;
// this will fail for periods that do not map to millis (e.g. P1M)
this.periodMillis = period.toStandardDuration().getMillis();
}
@Override
public QueryRunner<T> postProcess(QueryRunner<T> baseQueryRunner)
{
return postProcess(baseQueryRunner, DateTime.now().getMillis());
}
public QueryRunner<T> postProcess(final QueryRunner<T> baseRunner, final long now)
{
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(final Query<T> query)
{
final long offset = computeOffset(now);
final Interval interval = query.getIntervals().get(0);
final Interval modifiedInterval = new Interval(
interval.getStartMillis() + offset,
Math.min(interval.getEndMillis() + offset, now + offset)
);
return Sequences.map(
baseRunner.run(
query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(modifiedInterval)))
),
new Function<T, T>()
{
@Override
public T apply(T input)
{
if (input instanceof Result) {
Result res = (Result) input;
Object value = res.getValue();
if (value instanceof TimeBoundaryResultValue) {
TimeBoundaryResultValue boundary = (TimeBoundaryResultValue) value;
DateTime minTime = null;
try{
minTime = boundary.getMinTime();
} catch(IllegalArgumentException e) {}
final DateTime maxTime = boundary.getMaxTime();
return (T) ((TimeBoundaryQuery) query).buildResult(
new DateTime(Math.min(res.getTimestamp().getMillis() - offset, now)),
minTime != null ? minTime.minus(offset) : null,
maxTime != null ? new DateTime(Math.min(maxTime.getMillis() - offset, now)) : null
).iterator().next();
}
return (T) new Result(res.getTimestamp().minus(offset), value);
} else if (input instanceof MapBasedRow) {
MapBasedRow row = (MapBasedRow) input;
return (T) new MapBasedRow(row.getTimestamp().minus(offset), row.getEvent());
}
// default to noop for unknown result types
return input;
}
}
);
}
};
}
/**
* Map time t into the last `period` ending within `dataInterval`
*
* @param t
* @return the offset between the mapped time and time t
*/
protected long computeOffset(final long t)
{
// start is the beginning of the last period ending within dataInterval
long start = dataInterval.getEndMillis() - periodMillis;
long startOffset = start % periodMillis - originMillis % periodMillis;
if(startOffset < 0) {
startOffset += periodMillis;
};
start -= startOffset;
// tOffset is the offset time t within the last period
long tOffset = t % periodMillis - originMillis % periodMillis;
if(tOffset < 0) {
tOffset += periodMillis;
}
tOffset += start;
return tOffset - t;
}
}

View File

@ -39,7 +39,6 @@ import org.apache.commons.codec.binary.Base64;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
@ -233,6 +232,40 @@ public class CardinalityAggregatorFactory implements AggregatorFactory
return HyperLogLogCollector.makeLatestCollector();
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CardinalityAggregatorFactory that = (CardinalityAggregatorFactory) o;
if (byRow != that.byRow) {
return false;
}
if (fieldNames != null ? !fieldNames.equals(that.fieldNames) : that.fieldNames != null) {
return false;
}
if (name != null ? !name.equals(that.name) : that.name != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = name != null ? name.hashCode() : 0;
result = 31 * result + (fieldNames != null ? fieldNames.hashCode() : 0);
result = 31 * result + (byRow ? 1 : 0);
return result;
}
@Override
public String toString()
{

View File

@ -195,6 +195,13 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
return applyCorrection(e, zeroCount);
}
/**
* Checks if the payload for the given ByteBuffer is sparse or not.
* The given buffer must be positioned at getPayloadBytePosition() prior to calling isSparse
*
* @param buffer
* @return
*/
private static boolean isSparse(ByteBuffer buffer)
{
return buffer.remaining() != NUM_BYTES_FOR_BUCKETS;
@ -495,13 +502,32 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
return false;
}
HyperLogLogCollector collector = (HyperLogLogCollector) o;
ByteBuffer otherBuffer = ((HyperLogLogCollector) o).storageBuffer;
if (storageBuffer != null ? !storageBuffer.equals(collector.storageBuffer) : collector.storageBuffer != null) {
if (storageBuffer != null ? false : otherBuffer != null) {
return false;
}
return true;
if(storageBuffer == null && otherBuffer == null) {
return true;
}
final ByteBuffer denseStorageBuffer;
if(storageBuffer.remaining() != getNumBytesForDenseStorage()) {
HyperLogLogCollector denseCollector = HyperLogLogCollector.makeCollector(storageBuffer);
denseCollector.convertToDenseStorage();
denseStorageBuffer = denseCollector.storageBuffer;
} else {
denseStorageBuffer = storageBuffer;
}
if(otherBuffer.remaining() != getNumBytesForDenseStorage()) {
HyperLogLogCollector otherCollector = HyperLogLogCollector.makeCollector(otherBuffer);
otherCollector.convertToDenseStorage();
otherBuffer = otherCollector.storageBuffer;
}
return denseStorageBuffer.equals(otherBuffer);
}
@Override

View File

@ -257,6 +257,8 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
{
return new CacheStrategy<Row, Object, GroupByQuery>()
{
private final List<AggregatorFactory> aggs = query.getAggregatorSpecs();
@Override
public byte[] computeCacheKey(GroupByQuery query)
{
@ -342,14 +344,26 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
DateTime timestamp = granularity.toDateTime(((Number) results.next()).longValue());
Iterator<AggregatorFactory> aggsIter = aggs.iterator();
Map<String, Object> event = jsonMapper.convertValue(
results.next(),
new TypeReference<Map<String, Object>>()
{
}
);
while (aggsIter.hasNext()) {
final AggregatorFactory factory = aggsIter.next();
Object agg = event.get(factory.getName());
if (agg != null) {
event.put(factory.getName(), factory.deserialize(agg));
}
}
return new MapBasedRow(
timestamp,
(Map<String, Object>) jsonMapper.convertValue(
results.next(),
new TypeReference<Map<String, Object>>()
{
}
)
event
);
}
};

View File

@ -146,9 +146,10 @@ public class TimeBoundaryQueryQueryToolChest
@Override
public byte[] computeCacheKey(TimeBoundaryQuery query)
{
return ByteBuffer.allocate(2)
final byte[] cacheKey = query.getCacheKey();
return ByteBuffer.allocate(1 + cacheKey.length)
.put(TIMEBOUNDARY_QUERY)
.put(query.getCacheKey())
.put(cacheKey)
.array();
}

View File

@ -138,7 +138,10 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
{
Interval actualInterval = interval;
final Interval dataInterval = new Interval(getMinTime().getMillis(), gran.next(getMaxTime().getMillis()));
final Interval dataInterval = new Interval(
getMinTime().getMillis(),
gran.next(gran.truncate(getMaxTime().getMillis()))
);
if (!actualInterval.overlaps(dataInterval)) {
return Sequences.empty();

View File

@ -133,8 +133,11 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
Interval actualIntervalTmp = interval;
final Interval dataInterval = new Interval(
getMinTime().getMillis(),
gran.next(gran.truncate(getMaxTime().getMillis()))
);
final Interval dataInterval = new Interval(getMinTime().getMillis(), gran.next(getMaxTime().getMillis()));
if (!actualIntervalTmp.overlaps(dataInterval)) {
return Sequences.empty();
}

View File

@ -347,68 +347,128 @@ public class QueryGranularityTest
@Test
public void testCompoundPeriodTruncate() throws Exception
{
final DateTime origin = new DateTime("2012-01-02T05:00:00.000-08:00");
QueryGranularity periodOrigin = new PeriodGranularity(new Period("P1M2D"),
origin,
DateTimeZone.forID("America/Los_Angeles"));
assertSame(
Lists.newArrayList(
new DateTime("2011-11-30T05:00:00.000-08:00"),
new DateTime("2012-01-02T05:00:00.000-08:00"),
new DateTime("2012-02-04T05:00:00.000-08:00"),
new DateTime("2012-02-04T05:00:00.000-08:00")
),
Lists.newArrayList(
periodOrigin.truncate(new DateTime("2012-01-01T05:00:04.123-08:00").getMillis()),
periodOrigin.truncate(new DateTime("2012-01-02T07:00:04.123-08:00").getMillis()),
periodOrigin.truncate(new DateTime("2012-03-01T07:20:04.123-08:00").getMillis()),
periodOrigin.truncate(new DateTime("2012-02-04T05:00:00.000-08:00").getMillis())
)
);
{
final DateTime origin = new DateTime("2012-01-02T05:00:00.000-08:00");
QueryGranularity periodOrigin = new PeriodGranularity(
new Period("P1M2D"),
origin,
DateTimeZone.forID("America/Los_Angeles")
);
assertSame(
Lists.newArrayList(
new DateTime("2011-11-30T05:00:00.000-08:00"),
new DateTime("2012-01-02T05:00:00.000-08:00"),
new DateTime("2012-02-04T05:00:00.000-08:00"),
new DateTime("2012-02-04T05:00:00.000-08:00")
),
Lists.newArrayList(
periodOrigin.truncate(new DateTime("2012-01-01T05:00:04.123-08:00").getMillis()),
periodOrigin.truncate(new DateTime("2012-01-02T07:00:04.123-08:00").getMillis()),
periodOrigin.truncate(new DateTime("2012-03-01T07:20:04.123-08:00").getMillis()),
periodOrigin.truncate(new DateTime("2012-02-04T05:00:00.000-08:00").getMillis())
)
);
QueryGranularity periodNoOrigin = new PeriodGranularity(new Period("P1M2D"),
null,
DateTimeZone.forID("America/Los_Angeles"));
assertSame(
Lists.newArrayList(
new DateTime("1970-01-01T00:00:00.000-08:00"),
new DateTime("2011-12-12T00:00:00.000-08:00"),
new DateTime("2012-01-14T00:00:00.000-08:00"),
new DateTime("2012-02-16T00:00:00.000-08:00")
),
Lists.newArrayList(
periodNoOrigin.truncate(new DateTime("1970-01-01T05:02:04.123-08:00").getMillis()),
periodNoOrigin.truncate(new DateTime("2012-01-01T05:02:04.123-08:00").getMillis()),
periodNoOrigin.truncate(new DateTime("2012-01-15T07:01:04.123-08:00").getMillis()),
periodNoOrigin.truncate(new DateTime("2012-02-16T00:00:00.000-08:00").getMillis())
QueryGranularity periodNoOrigin = new PeriodGranularity(
new Period("P1M2D"),
null,
DateTimeZone.forID("America/Los_Angeles")
);
assertSame(
Lists.newArrayList(
new DateTime("1970-01-01T00:00:00.000-08:00"),
new DateTime("2011-12-12T00:00:00.000-08:00"),
new DateTime("2012-01-14T00:00:00.000-08:00"),
new DateTime("2012-02-16T00:00:00.000-08:00")
),
Lists.newArrayList(
periodNoOrigin.truncate(new DateTime("1970-01-01T05:02:04.123-08:00").getMillis()),
periodNoOrigin.truncate(new DateTime("2012-01-01T05:02:04.123-08:00").getMillis()),
periodNoOrigin.truncate(new DateTime("2012-01-15T07:01:04.123-08:00").getMillis()),
periodNoOrigin.truncate(new DateTime("2012-02-16T00:00:00.000-08:00").getMillis())
)
);
)
);
}
{
final DateTime origin = new DateTime("2012-01-02T05:00:00.000-08:00");
QueryGranularity periodOrigin = new PeriodGranularity(
new Period("PT12H5M"),
origin,
DateTimeZone.forID("America/Los_Angeles")
);
assertSame(
Lists.newArrayList(
new DateTime("2012-01-01T04:50:00.000-08:00"),
new DateTime("2012-01-02T05:00:00.000-08:00"),
new DateTime("2012-01-02T17:05:00.000-08:00"),
new DateTime("2012-02-03T22:25:00.000-08:00")
),
Lists.newArrayList(
periodOrigin.truncate(new DateTime("2012-01-01T05:00:04.123-08:00").getMillis()),
periodOrigin.truncate(new DateTime("2012-01-02T07:00:04.123-08:00").getMillis()),
periodOrigin.truncate(new DateTime("2012-01-03T00:20:04.123-08:00").getMillis()),
periodOrigin.truncate(new DateTime("2012-02-03T22:25:00.000-08:00").getMillis())
)
);
}
}
@Test
public void testCompoundPeriodMillisTruncate() throws Exception
{
final DateTime origin = new DateTime("2012-01-02T05:00:00.000-08:00");
QueryGranularity periodOrigin = new PeriodGranularity(new Period("PT12H5M"),
origin,
DateTimeZone.forID("America/Los_Angeles"));
assertSame(
Lists.newArrayList(
new DateTime("2012-01-01T04:50:00.000-08:00"),
new DateTime("2012-01-02T05:00:00.000-08:00"),
new DateTime("2012-01-02T17:05:00.000-08:00"),
new DateTime("2012-02-03T22:25:00.000-08:00")
),
Lists.newArrayList(
periodOrigin.truncate(new DateTime("2012-01-01T05:00:04.123-08:00").getMillis()),
periodOrigin.truncate(new DateTime("2012-01-02T07:00:04.123-08:00").getMillis()),
periodOrigin.truncate(new DateTime("2012-01-03T00:20:04.123-08:00").getMillis()),
periodOrigin.truncate(new DateTime("2012-02-03T22:25:00.000-08:00").getMillis())
)
);
{
final DateTime origin = new DateTime("2012-01-02T05:00:00.000-08:00");
QueryGranularity periodOrigin = new PeriodGranularity(
new Period("PT12H5M"),
origin,
DateTimeZone.UTC
);
assertSame(
Lists.newArrayList(
new DateTime("2012-01-01T04:50:00.000-08:00"),
new DateTime("2012-01-02T05:00:00.000-08:00"),
new DateTime("2012-01-02T17:05:00.000-08:00"),
new DateTime("2012-02-03T22:25:00.000-08:00")
),
Lists.newArrayList(
periodOrigin.truncate(new DateTime("2012-01-01T05:00:04.123-08:00").getMillis()),
periodOrigin.truncate(new DateTime("2012-01-02T07:00:04.123-08:00").getMillis()),
periodOrigin.truncate(new DateTime("2012-01-03T00:20:04.123-08:00").getMillis()),
periodOrigin.truncate(new DateTime("2012-02-03T22:25:00.000-08:00").getMillis())
)
);
}
}
@Test
public void testDurationTruncate() throws Exception
{
{
final DateTime origin = new DateTime("2012-01-02T05:00:00.000-08:00");
QueryGranularity gran = new DurationGranularity(
new Period("PT12H5M").toStandardDuration().getMillis(),
origin
);
assertSame(
Lists.newArrayList(
new DateTime("2012-01-01T04:50:00.000-08:00"),
new DateTime("2012-01-02T05:00:00.000-08:00"),
new DateTime("2012-01-02T17:05:00.000-08:00"),
new DateTime("2012-02-03T22:25:00.000-08:00")
),
Lists.newArrayList(
gran.truncate(new DateTime("2012-01-01T05:00:04.123-08:00").getMillis()),
gran.truncate(new DateTime("2012-01-02T07:00:04.123-08:00").getMillis()),
gran.truncate(new DateTime("2012-01-03T00:20:04.123-08:00").getMillis()),
gran.truncate(new DateTime("2012-02-03T22:25:00.000-08:00").getMillis())
)
);
}
}
@Test
public void testIterableAllSimple() throws Exception
{
@ -524,7 +584,7 @@ public class QueryGranularityTest
while (actualIter.hasNext() && expectedIter.hasNext()) {
long a = actualIter.next().longValue();
Assert.assertEquals(expectedIter.next().getMillis(), a);
Assert.assertEquals(expectedIter.next(), new DateTime(a));
}
Assert.assertFalse("actualIter not exhausted!?", actualIter.hasNext());
Assert.assertFalse("expectedIter not exhausted!?", expectedIter.hasNext());

View File

@ -0,0 +1,168 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.timeboundary.TimeBoundaryResultValue;
import io.druid.query.timeseries.TimeseriesResultValue;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
public class TimewarpOperatorTest
{
TimewarpOperator<Result<TimeseriesResultValue>> testOperator = new TimewarpOperator<>(
new Interval(new DateTime("2014-01-01"), new DateTime("2014-01-15")),
new Period("P1W"),
new DateTime("2014-01-06") // align on Monday
);
@Test
public void testComputeOffset() throws Exception
{
{
final DateTime t = new DateTime("2014-01-23");
final DateTime tOffset = new DateTime("2014-01-09");
Assert.assertEquals(
new DateTime(tOffset),
t.plus(testOperator.computeOffset(t.getMillis()))
);
}
{
final DateTime t = new DateTime("2014-08-02");
final DateTime tOffset = new DateTime("2014-01-11");
Assert.assertEquals(
new DateTime(tOffset),
t.plus(testOperator.computeOffset(t.getMillis()))
);
}
}
@Test
public void testPostProcess() throws Exception
{
QueryRunner<Result<TimeseriesResultValue>> queryRunner = testOperator.postProcess(
new QueryRunner<Result<TimeseriesResultValue>>()
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(Query<Result<TimeseriesResultValue>> query)
{
return Sequences.simple(
ImmutableList.of(
new Result<>(
new DateTime(new DateTime("2014-01-09")),
new TimeseriesResultValue(ImmutableMap.<String, Object>of("metric", 2))
),
new Result<>(
new DateTime(new DateTime("2014-01-11")),
new TimeseriesResultValue(ImmutableMap.<String, Object>of("metric", 3))
),
new Result<>(
query.getIntervals().get(0).getEnd(),
new TimeseriesResultValue(ImmutableMap.<String, Object>of("metric", 5))
)
)
);
}
},
new DateTime("2014-08-02").getMillis()
);
final Query<Result<TimeseriesResultValue>> query =
Druids.newTimeseriesQueryBuilder()
.dataSource("dummy")
.intervals("2014-07-31/2014-08-05")
.aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("count")))
.build();
Assert.assertEquals(
Lists.newArrayList(
new Result<>(
new DateTime("2014-07-31"),
new TimeseriesResultValue(ImmutableMap.<String, Object>of("metric", 2))
),
new Result<>(
new DateTime("2014-08-02"),
new TimeseriesResultValue(ImmutableMap.<String, Object>of("metric", 3))
),
new Result<>(
new DateTime("2014-08-02"),
new TimeseriesResultValue(ImmutableMap.<String, Object>of("metric", 5))
)
),
Sequences.toList(queryRunner.run(query), Lists.<Result<TimeseriesResultValue>>newArrayList())
);
TimewarpOperator<Result<TimeBoundaryResultValue>> timeBoundaryOperator = new TimewarpOperator<>(
new Interval(new DateTime("2014-01-01"), new DateTime("2014-01-15")),
new Period("P1W"),
new DateTime("2014-01-06") // align on Monday
);
QueryRunner<Result<TimeBoundaryResultValue>> timeBoundaryRunner = timeBoundaryOperator.postProcess(
new QueryRunner<Result<TimeBoundaryResultValue>>()
{
@Override
public Sequence<Result<TimeBoundaryResultValue>> run(Query<Result<TimeBoundaryResultValue>> query)
{
return Sequences.simple(
ImmutableList.of(
new Result<>(
new DateTime("2014-01-12"),
new TimeBoundaryResultValue(ImmutableMap.<String, Object>of("maxTime", new DateTime("2014-01-12")))
)
)
);
}
},
new DateTime("2014-08-02").getMillis()
);
final Query<Result<TimeBoundaryResultValue>> timeBoundaryQuery =
Druids.newTimeBoundaryQueryBuilder()
.dataSource("dummy")
.build();
Assert.assertEquals(
Lists.newArrayList(
new Result<>(
new DateTime("2014-08-02"),
new TimeBoundaryResultValue(ImmutableMap.<String, Object>of("maxTime", new DateTime("2014-08-02")))
)
),
Sequences.toList(timeBoundaryRunner.run(timeBoundaryQuery), Lists.<Result<TimeBoundaryResultValue>>newArrayList())
);
}
}

View File

@ -152,6 +152,13 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
// fill anything in.
}
@Override
public void testTimeseriesQueryZeroFilling()
{
// Skip this test because the timeseries test expects skipped hours to be filled in, but group by doesn't
// fill anything in.
}
@Override
public void testTimeseriesWithNonExistentFilter()
{

View File

@ -21,7 +21,9 @@ package io.druid.query.timeseries;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.Granularity;
import com.metamx.common.guava.Sequences;
import io.druid.granularity.PeriodGranularity;
import io.druid.granularity.QueryGranularity;
@ -468,6 +470,121 @@ public class TimeseriesQueryRunnerTest
TestHelper.assertExpectedResults(expectedResults1, results1);
}
@Test
public void testTimeseriesQueryZeroFilling()
{
TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.filters(QueryRunnerTestHelper.providerDimension, "spot", "upfront", "total_market")
.granularity(QueryGranularity.HOUR)
.intervals(
Arrays.asList(
new Interval(
"2011-04-14T00:00:00.000Z/2011-05-01T00:00:00.000Z"
)
)
)
.aggregators(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory(
"idx",
"index"
)
)
)
.build();
List<Result<TimeseriesResultValue>> lotsOfZeroes = Lists.newArrayList();
for (final Long millis : QueryGranularity.HOUR.iterable(
new DateTime("2011-04-14T01").getMillis(),
new DateTime("2011-04-15").getMillis()
)) {
lotsOfZeroes.add(
new Result<>(
new DateTime(millis),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of("rows", 0L, "idx", 0L)
)
)
);
}
List<Result<TimeseriesResultValue>> expectedResults1 = Lists.newArrayList(
Iterables.concat(
Arrays.asList(
new Result<>(
new DateTime("2011-04-14T00"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of("rows", 13L, "idx", 4907L)
)
)
),
lotsOfZeroes,
Arrays.asList(
new Result<>(
new DateTime("2011-04-15T00"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of("rows", 13L, "idx", 4717L)
)
)
)
)
);
Iterable<Result<TimeseriesResultValue>> results1 = Sequences.toList(
runner.run(query1),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults1, results1);
}
@Test
public void testTimeseriesQueryGranularityNotAlignedWithRollupGranularity()
{
TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.filters(QueryRunnerTestHelper.providerDimension, "spot", "upfront", "total_market")
.granularity(
new PeriodGranularity(
new Period("PT1H"),
new DateTime(60000),
DateTimeZone.UTC
)
)
.intervals(
Arrays.asList(
new Interval(
"2011-04-15T00:00:00.000Z/2012"
)
)
)
.aggregators(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory(
"idx",
"index"
)
)
)
.build();
List<Result<TimeseriesResultValue>> expectedResults1 = Arrays.asList(
new Result<TimeseriesResultValue>(
new DateTime("2011-04-14T23:01Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of("rows", 13L, "idx", 4717L)
)
)
);
Iterable<Result<TimeseriesResultValue>> results1 = Sequences.toList(
runner.run(query1),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults1, results1);
}
@Test
public void testTimeseriesWithVaryingGranWithFilter()
{

View File

@ -35,7 +35,6 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import org.joda.time.DateTime;
@ -69,7 +68,7 @@ public class TestIndex
};
public static final String[] DIMENSIONS = new String[]{"provider", "quALIty", "plAcEmEnT", "pLacementish"};
public static final String[] METRICS = new String[]{"iNdEx"};
private static final Interval DATA_INTERVAL = new Interval("2011-01-12T00:00:00.000Z/2011-04-16T00:00:00.000Z");
private static final Interval DATA_INTERVAL = new Interval("2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z");
private static final AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{
new DoubleSumAggregatorFactory(METRICS[0], METRICS[0]),
new HyperUniquesAggregatorFactory("quality_uniques", "quality")

View File

@ -72,10 +72,8 @@ import java.util.concurrent.ScheduledExecutorService;
public class RealtimePlumber implements Plumber
{
private static final EmittingLogger log = new EmittingLogger(RealtimePlumber.class);
private final DataSchema schema;
private final RealtimeTuningConfig config;
private final RejectionPolicy rejectionPolicy;
private final FireDepartmentMetrics metrics;
private final ServiceEmitter emitter;
@ -312,6 +310,22 @@ public class RealtimePlumber implements Plumber
{
final Interval interval = sink.getInterval();
// use a file to indicate that pushing has completed
final File persistDir = computePersistDir(schema, interval);
final File mergedTarget = new File(persistDir, "merged");
final File isPushedMarker = new File(persistDir, "isPushedMarker");
if (!isPushedMarker.exists()) {
removeSegment(sink, mergedTarget);
if (mergedTarget.exists()) {
log.wtf("Merged target[%s] exists?!", mergedTarget);
return;
}
} else {
log.info("Already pushed sink[%s]", sink);
return;
}
for (FireHydrant hydrant : sink) {
synchronized (hydrant) {
if (!hydrant.hasSwapped()) {
@ -322,12 +336,6 @@ public class RealtimePlumber implements Plumber
}
}
final File mergedTarget = new File(computePersistDir(schema, interval), "merged");
if (mergedTarget.exists()) {
log.info("Skipping already-merged sink: %s", sink);
return;
}
try {
List<QueryableIndex> indexes = Lists.newArrayList();
for (FireHydrant fireHydrant : sink) {
@ -351,6 +359,14 @@ public class RealtimePlumber implements Plumber
);
segmentPublisher.publishSegment(segment);
if (!isPushedMarker.createNewFile()) {
log.makeAlert("Failed to create marker file for [%s]", schema.getDataSource())
.addData("interval", sink.getInterval())
.addData("partitionNum", segment.getShardSpec().getPartitionNum())
.addData("marker", isPushedMarker)
.emit();
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource())
@ -360,9 +376,6 @@ public class RealtimePlumber implements Plumber
// We're trying to shut down, and this segment failed to push. Let's just get rid of it.
// This call will also delete possibly-partially-written files, so we don't need to do it explicitly.
abandonSegment(truncatedTime, sink);
} else {
// Delete any possibly-partially-written files, so we can try again on the next push cycle.
removeMergedSegment(sink);
}
}
}
@ -610,9 +623,15 @@ public class RealtimePlumber implements Plumber
final long windowMillis = windowPeriod.toStandardDuration().getMillis();
log.info("Starting merge and push.");
DateTime minTimestampAsDate = segmentGranularity.truncate(
rejectionPolicy.getCurrMaxTime().minus(windowMillis)
new DateTime(
Math.max(
windowMillis,
rejectionPolicy.getCurrMaxTime()
.getMillis()
)
- windowMillis
)
);
long minTimestamp = minTimestampAsDate.getMillis();
@ -649,7 +668,7 @@ public class RealtimePlumber implements Plumber
{
try {
segmentAnnouncer.unannounceSegment(sink.getSegment());
removeMergedSegment(sink);
removeSegment(sink, computePersistDir(schema, sink.getInterval()));
log.info("Removing sinkKey %d for segment %s", truncatedTime, sink.getSegment().getIdentifier());
sinks.remove(truncatedTime);
sinkTimeline.remove(
@ -785,29 +804,29 @@ public class RealtimePlumber implements Plumber
&& config.getShardSpec().getPartitionNum() == segment.getShardSpec().getPartitionNum()
&& Iterables.any(
sinks.keySet(), new Predicate<Long>()
{
@Override
public boolean apply(Long sinkKey)
{
return segment.getInterval().contains(sinkKey);
}
}
{
@Override
public boolean apply(Long sinkKey)
{
return segment.getInterval().contains(sinkKey);
}
}
);
}
}
);
}
private void removeMergedSegment(final Sink sink)
private void removeSegment(final Sink sink, final File target)
{
final File mergedTarget = new File(computePersistDir(schema, sink.getInterval()), "merged");
if (mergedTarget.exists()) {
if (target.exists()) {
try {
log.info("Deleting Index File[%s]", mergedTarget);
FileUtils.deleteDirectory(mergedTarget);
log.info("Deleting Index File[%s]", target);
FileUtils.deleteDirectory(target);
}
catch (Exception e) {
log.makeAlert(e, "Unable to remove merged segment for dataSource[%s]", schema.getDataSource())
log.makeAlert(e, "Unable to remove file for dataSource[%s]", schema.getDataSource())
.addData("file", target)
.addData("interval", sink.getInterval())
.emit();
}

View File

@ -19,6 +19,8 @@
package io.druid.server;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceEmitter;
@ -26,6 +28,7 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.CachingClusteredClient;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.MetricsEmittingQueryRunner;
import io.druid.query.PostProcessingOperator;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QuerySegmentWalker;
@ -47,19 +50,22 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
private final CachingClusteredClient baseClient;
private final QueryToolChestWarehouse warehouse;
private final RetryQueryRunnerConfig retryConfig;
private final ObjectMapper objectMapper;
@Inject
public ClientQuerySegmentWalker(
ServiceEmitter emitter,
CachingClusteredClient baseClient,
QueryToolChestWarehouse warehouse,
RetryQueryRunnerConfig retryConfig
RetryQueryRunnerConfig retryConfig,
ObjectMapper objectMapper
)
{
this.emitter = emitter;
this.baseClient = baseClient;
this.warehouse = warehouse;
this.retryConfig = retryConfig;
this.objectMapper = objectMapper;
}
@Override
@ -74,10 +80,10 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
return makeRunner(query);
}
private <T> FinalizeResultsQueryRunner<T> makeRunner(final Query<T> query)
private <T> QueryRunner<T> makeRunner(final Query<T> query)
{
final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
return new FinalizeResultsQueryRunner<T>(
final FinalizeResultsQueryRunner<T> baseRunner = new FinalizeResultsQueryRunner<T>(
toolChest.postMergeQueryDecoration(
toolChest.mergeResults(
new UnionQueryRunner<T>(
@ -100,5 +106,15 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
),
toolChest
);
final PostProcessingOperator<T> postProcessing = objectMapper.convertValue(
query.getContext().get("postProcessing"),
new TypeReference<PostProcessingOperator<T>>() {
}
);
return postProcessing != null ?
postProcessing.postProcess(baseRunner) : baseRunner;
}
}

View File

@ -22,6 +22,7 @@ package io.druid.client;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
@ -31,6 +32,8 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.metamx.common.ISE;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.guava.MergeIterable;
@ -65,6 +68,8 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
import io.druid.query.aggregation.post.ConstantPostAggregator;
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
@ -817,41 +822,50 @@ public class CachingClusteredClientTest
@Test
public void testGroupByCaching() throws Exception
{
List<AggregatorFactory> aggsWithUniques = ImmutableList.<AggregatorFactory>builder().addAll(AGGS)
.add(new HyperUniquesAggregatorFactory("uniques", "uniques")).build();
final HashFunction hashFn = Hashing.murmur3_128();
GroupByQuery.Builder builder = new GroupByQuery.Builder()
.setDataSource(DATA_SOURCE)
.setQuerySegmentSpec(SEG_SPEC)
.setDimFilter(DIM_FILTER)
.setGranularity(GRANULARITY)
.setDimensions(Arrays.<DimensionSpec>asList(new DefaultDimensionSpec("a", "a")))
.setAggregatorSpecs(AGGS)
.setAggregatorSpecs(aggsWithUniques)
.setPostAggregatorSpecs(POST_AGGS)
.setContext(CONTEXT);
final HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
collector.add(hashFn.hashString("abc123", Charsets.UTF_8).asBytes());
collector.add(hashFn.hashString("123abc", Charsets.UTF_8).asBytes());
testQueryCaching(
client,
builder.build(),
new Interval("2011-01-01/2011-01-02"),
makeGroupByResults(new DateTime("2011-01-01"), ImmutableMap.of("a", "a", "rows", 1, "imps", 1, "impers", 1)),
makeGroupByResults(new DateTime("2011-01-01"), ImmutableMap.of("a", "a", "rows", 1, "imps", 1, "impers", 1, "uniques", collector)),
new Interval("2011-01-02/2011-01-03"),
makeGroupByResults(new DateTime("2011-01-02"), ImmutableMap.of("a", "b", "rows", 2, "imps", 2, "impers", 2)),
makeGroupByResults(new DateTime("2011-01-02"), ImmutableMap.of("a", "b", "rows", 2, "imps", 2, "impers", 2, "uniques", collector)),
new Interval("2011-01-05/2011-01-10"),
makeGroupByResults(
new DateTime("2011-01-05"), ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3),
new DateTime("2011-01-06"), ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4),
new DateTime("2011-01-07"), ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5),
new DateTime("2011-01-08"), ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6),
new DateTime("2011-01-09"), ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7)
new DateTime("2011-01-05"), ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3, "uniques", collector),
new DateTime("2011-01-06"), ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4, "uniques", collector),
new DateTime("2011-01-07"), ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5, "uniques", collector),
new DateTime("2011-01-08"), ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6, "uniques", collector),
new DateTime("2011-01-09"), ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7, "uniques", collector)
),
new Interval("2011-01-05/2011-01-10"),
makeGroupByResults(
new DateTime("2011-01-05T01"), ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3),
new DateTime("2011-01-06T01"), ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4),
new DateTime("2011-01-07T01"), ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5),
new DateTime("2011-01-08T01"), ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6),
new DateTime("2011-01-09T01"), ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7)
new DateTime("2011-01-05T01"), ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3, "uniques", collector),
new DateTime("2011-01-06T01"), ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4, "uniques", collector),
new DateTime("2011-01-07T01"), ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5, "uniques", collector),
new DateTime("2011-01-08T01"), ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6, "uniques", collector),
new DateTime("2011-01-09T01"), ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7, "uniques", collector)
)
);
@ -886,16 +900,16 @@ public class CachingClusteredClientTest
HashMap<String,Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedObjects(
makeGroupByResults(
new DateTime("2011-01-05T"), ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3),
new DateTime("2011-01-05T01"), ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3),
new DateTime("2011-01-06T"), ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4),
new DateTime("2011-01-06T01"), ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4),
new DateTime("2011-01-07T"), ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5),
new DateTime("2011-01-07T01"), ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5),
new DateTime("2011-01-08T"), ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6),
new DateTime("2011-01-08T01"), ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6),
new DateTime("2011-01-09T"), ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7),
new DateTime("2011-01-09T01"), ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7)
new DateTime("2011-01-05T"), ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3, "uniques", collector),
new DateTime("2011-01-05T01"), ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3, "uniques", collector),
new DateTime("2011-01-06T"), ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4, "uniques", collector),
new DateTime("2011-01-06T01"), ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4, "uniques", collector),
new DateTime("2011-01-07T"), ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5, "uniques", collector),
new DateTime("2011-01-07T01"), ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5, "uniques", collector),
new DateTime("2011-01-08T"), ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6, "uniques", collector),
new DateTime("2011-01-08T01"), ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6, "uniques", collector),
new DateTime("2011-01-09T"), ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7, "uniques", collector),
new DateTime("2011-01-09T01"), ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7, "uniques", collector)
),
runner.run(
builder.setInterval("2011-01-05/2011-01-10")
@ -928,6 +942,48 @@ public class CachingClusteredClientTest
new Interval("2011-01-01/2011-01-10"),
makeTimeBoundaryResult(new DateTime("2011-01-05T01"), new DateTime("2011-01-05T01"), new DateTime("2011-01-10"))
);
testQueryCaching(
client,
Druids.newTimeBoundaryQueryBuilder()
.dataSource(CachingClusteredClientTest.DATA_SOURCE)
.intervals(CachingClusteredClientTest.SEG_SPEC)
.context(CachingClusteredClientTest.CONTEXT)
.bound(TimeBoundaryQuery.MAX_TIME)
.build(),
new Interval("2011-01-01/2011-01-02"),
makeTimeBoundaryResult(new DateTime("2011-01-01"), null, new DateTime("2011-01-02")),
new Interval("2011-01-01/2011-01-03"),
makeTimeBoundaryResult(new DateTime("2011-01-02"), null, new DateTime("2011-01-03")),
new Interval("2011-01-01/2011-01-10"),
makeTimeBoundaryResult(new DateTime("2011-01-05"), null, new DateTime("2011-01-10")),
new Interval("2011-01-01/2011-01-10"),
makeTimeBoundaryResult(new DateTime("2011-01-05T01"), null, new DateTime("2011-01-10"))
);
testQueryCaching(
client,
Druids.newTimeBoundaryQueryBuilder()
.dataSource(CachingClusteredClientTest.DATA_SOURCE)
.intervals(CachingClusteredClientTest.SEG_SPEC)
.context(CachingClusteredClientTest.CONTEXT)
.bound(TimeBoundaryQuery.MIN_TIME)
.build(),
new Interval("2011-01-01/2011-01-02"),
makeTimeBoundaryResult(new DateTime("2011-01-01"), new DateTime("2011-01-01"), null),
new Interval("2011-01-01/2011-01-03"),
makeTimeBoundaryResult(new DateTime("2011-01-02"), new DateTime("2011-01-02"), null),
new Interval("2011-01-01/2011-01-10"),
makeTimeBoundaryResult(new DateTime("2011-01-05"), new DateTime("2011-01-05"), null),
new Interval("2011-01-01/2011-01-10"),
makeTimeBoundaryResult(new DateTime("2011-01-05T01"), new DateTime("2011-01-05T01"), null)
);
}
private Iterable<Result<TimeBoundaryResultValue>> makeTimeBoundaryResult(
@ -936,17 +992,30 @@ public class CachingClusteredClientTest
DateTime maxTime
)
{
final Object value;
if (minTime != null && maxTime != null) {
value = ImmutableMap.of(
TimeBoundaryQuery.MIN_TIME,
minTime.toString(),
TimeBoundaryQuery.MAX_TIME,
maxTime.toString()
);
} else if (maxTime != null) {
value = ImmutableMap.of(
TimeBoundaryQuery.MAX_TIME,
maxTime.toString()
);
} else {
value = ImmutableMap.of(
TimeBoundaryQuery.MIN_TIME,
minTime.toString()
);
}
return Arrays.asList(
new Result<>(
timestamp,
new TimeBoundaryResultValue(
ImmutableMap.of(
TimeBoundaryQuery.MIN_TIME,
minTime.toString(),
TimeBoundaryQuery.MAX_TIME,
maxTime.toString()
)
)
new TimeBoundaryResultValue(value)
)
);
}

View File

@ -30,10 +30,10 @@ import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.FilteredServerView;
import io.druid.client.ServerView;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.ParseSpec;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.ParseSpec;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.query.DefaultQueryRunnerFactoryConglomerate;
@ -41,7 +41,6 @@ import io.druid.query.Query;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
@ -52,26 +51,57 @@ import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
/**
*/
@RunWith(Parameterized.class)
public class RealtimePlumberSchoolTest
{
private Plumber plumber;
private final RejectionPolicyFactory rejectionPolicy;
private RealtimePlumber plumber;
private DataSegmentAnnouncer announcer;
private SegmentPublisher segmentPublisher;
private DataSegmentPusher dataSegmentPusher;
private FilteredServerView serverView;
private ServiceEmitter emitter;
private RealtimeTuningConfig tuningConfig;
private DataSchema schema;
public RealtimePlumberSchoolTest(RejectionPolicyFactory rejectionPolicy)
{
this.rejectionPolicy = rejectionPolicy;
}
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
{
return Arrays.asList(
new Object[][]{
{
new NoopRejectionPolicyFactory()
},
{
new MessageTimeRejectionPolicyFactory()
}
}
);
}
@Before
public void setUp() throws Exception
@ -80,7 +110,7 @@ public class RealtimePlumberSchoolTest
final File tmpDir = Files.createTempDir();
tmpDir.deleteOnExit();
final DataSchema schema = new DataSchema(
schema = new DataSchema(
"test",
new InputRowParser()
{
@ -110,9 +140,8 @@ public class RealtimePlumberSchoolTest
announcer.announceSegment(EasyMock.<DataSegment>anyObject());
EasyMock.expectLastCall().anyTimes();
segmentPublisher = EasyMock.createMock(SegmentPublisher.class);
dataSegmentPusher = EasyMock.createMock(DataSegmentPusher.class);
segmentPublisher = EasyMock.createNiceMock(SegmentPublisher.class);
dataSegmentPusher = EasyMock.createNiceMock(DataSegmentPusher.class);
serverView = EasyMock.createMock(FilteredServerView.class);
serverView.registerSegmentCallback(
EasyMock.<Executor>anyObject(),
@ -125,13 +154,13 @@ public class RealtimePlumberSchoolTest
EasyMock.replay(announcer, segmentPublisher, dataSegmentPusher, serverView, emitter);
RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(
tuningConfig = new RealtimeTuningConfig(
1,
null,
null,
null,
new IntervalStartVersioningPolicy(),
new NoopRejectionPolicyFactory(),
rejectionPolicy,
null,
null
);
@ -148,12 +177,12 @@ public class RealtimePlumberSchoolTest
tmpDir,
Granularity.HOUR,
new IntervalStartVersioningPolicy(),
new NoopRejectionPolicyFactory(),
rejectionPolicy,
null,
0
);
plumber = realtimePlumberSchool.findPlumber(schema, tuningConfig, new FireDepartmentMetrics());
plumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema, tuningConfig, new FireDepartmentMetrics());
}
@After
@ -166,7 +195,13 @@ public class RealtimePlumberSchoolTest
public void testPersist() throws Exception
{
final MutableBoolean committed = new MutableBoolean(false);
plumber.getSinks().put(0L, new Sink(new Interval(0, TimeUnit.HOURS.toMillis(1)),schema, tuningConfig, DateTime.now().toString()));
plumber.startJob();
final InputRow row = EasyMock.createNiceMock(InputRow.class);
EasyMock.expect(row.getTimestampFromEpoch()).andReturn(0L);
EasyMock.expect(row.getDimensions()).andReturn(new ArrayList<String>());
EasyMock.replay(row);
plumber.add(row);
plumber.persist(
new Runnable()
{
@ -185,6 +220,7 @@ public class RealtimePlumberSchoolTest
throw new ISE("Taking too long to set perist value");
}
}
plumber.getSinks().clear();
plumber.finishJob();
}
}

View File

@ -46,20 +46,21 @@ import java.util.List;
public class CliHadoopIndexer implements Runnable
{
private static String defaultHadoopCoordinates = "org.apache.hadoop:hadoop-client:2.3.0";
private static final String DEFAULT_HADOOP_COORDINATES = "org.apache.hadoop:hadoop-client:2.3.0";
private static final Logger log = new Logger(CliHadoopIndexer.class);
@Arguments(description = "A JSON object or the path to a file that contains a JSON object", required = true)
private String argumentSpec;
@Option(name = "hadoop",
description = "The maven coordinates to the version of hadoop to run with. Defaults to org.apache.hadoop:hadoop-client:2.3.0")
private String hadoopCoordinates = defaultHadoopCoordinates;
@Option(name = {"-c", "--coordinate", "hadoopDependencies"},
description = "extra dependencies to pull down (e.g. non-default hadoop coordinates or extra hadoop jars)")
private List<String> coordinates;
@Option(name = "hadoopDependencies",
description = "The maven coordinates to the version of hadoop and all dependencies to run with. Defaults to using org.apache.hadoop:hadoop-client:2.3.0")
private List<String> hadoopDependencyCoordinates = Arrays.<String>asList(defaultHadoopCoordinates);
@Option(name = "--no-default-hadoop",
description = "don't pull down the default hadoop version (currently " + DEFAULT_HADOOP_COORDINATES + ")",
required = false)
public boolean noDefaultHadoop;
@Inject
private ExtensionsConfig extensionsConfig = null;
@ -69,6 +70,14 @@ public class CliHadoopIndexer implements Runnable
public void run()
{
try {
final List<String> allCoordinates = Lists.newArrayList();
if (coordinates != null) {
allCoordinates.addAll(coordinates);
}
if (!noDefaultHadoop) {
allCoordinates.add(DEFAULT_HADOOP_COORDINATES);
}
final DefaultTeslaAether aetherClient = Initialization.getAetherClient(extensionsConfig);
final List<URL> extensionURLs = Lists.newArrayList();
@ -85,7 +94,7 @@ public class CliHadoopIndexer implements Runnable
final List<URL> driverURLs = Lists.newArrayList();
driverURLs.addAll(nonHadoopURLs);
// put hadoop dependencies last to avoid jets3t & apache.httpcore version conflicts
for (String coordinate : hadoopDependencyCoordinates) {
for (String coordinate : allCoordinates) {
final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates(
aetherClient, coordinate
);

View File

@ -39,7 +39,7 @@ import java.util.List;
)
public class PullDependencies implements Runnable
{
@Option(name = "-c",
@Option(name = {"-c", "--coordinate"},
title = "coordinate",
description = "extra dependencies to pull down (e.g. hadoop coordinates)",
required = false)