Merge branch 'master' into new-schema

Conflicts:
	indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java
	indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
	indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java
	processing/src/test/java/io/druid/segment/TestIndex.java
	server/src/main/java/io/druid/segment/realtime/RealtimeManager.java
	server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java
This commit is contained in:
fjy 2014-03-17 10:59:31 -07:00
commit 2adcf07f5f
174 changed files with 8240 additions and 1162 deletions

View File

@ -30,4 +30,4 @@ echo "For examples, see: "
echo " "
ls -1 examples/*/*sh
echo " "
echo "See also http://druid.io/docs/0.6.61"
echo "See also http://druid.io/docs/0.6.69"

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.63-SNAPSHOT</version>
<version>0.6.70-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.63-SNAPSHOT</version>
<version>0.6.70-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -22,11 +22,13 @@ package io.druid.concurrent;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -60,25 +62,29 @@ public class Execs
* @param capacity maximum capacity after which the executorService will block on accepting new tasks
* @return ExecutorService which blocks accepting new tasks when the capacity reached
*/
public static ExecutorService newBlockingSingleThreaded(String nameFormat, int capacity)
public static ExecutorService newBlockingSingleThreaded(final String nameFormat, final int capacity)
{
return new ThreadPoolExecutor(
1, 1,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(capacity), makeThreadFactory(nameFormat)
, new RejectedExecutionHandler()
{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
{
try {
((ArrayBlockingQueue) executor.getQueue()).put(r);
}
catch (InterruptedException e) {
throw new RejectedExecutionException("Got Interrupted while adding to the Queue");
}
}
final BlockingQueue<Runnable> queue;
if (capacity > 0) {
queue = new ArrayBlockingQueue<>(capacity);
} else {
queue = new SynchronousQueue<>();
}
return new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.MILLISECONDS, queue, makeThreadFactory(nameFormat),
new RejectedExecutionHandler()
{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
{
try {
executor.getQueue().put(r);
}
catch (InterruptedException e) {
throw new RejectedExecutionException("Got Interrupted while adding to the Queue");
}
}
}
);
}
}

View File

@ -20,6 +20,8 @@
package io.druid.concurrent;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.logger.Logger;
import org.junit.Assert;
import org.junit.Test;
@ -30,23 +32,46 @@ import java.util.concurrent.atomic.AtomicInteger;
public class ExecsTest
{
private static final Logger log = new Logger(ExecsTest.class);
@Test
public void testBlockingExecutorService() throws Exception
public void testBlockingExecutorServiceZeroCapacity() throws Exception
{
final int capacity = 3;
final ExecutorService blockingExecutor = Execs.newBlockingSingleThreaded("test%d", capacity);
final CountDownLatch queueFullSignal = new CountDownLatch(capacity + 1);
final CountDownLatch taskCompletedSignal = new CountDownLatch(2 * capacity);
runTest(0);
}
@Test
public void testBlockingExecutorServiceOneCapacity() throws Exception
{
runTest(1);
}
@Test
public void testBlockingExecutorServiceThreeCapacity() throws Exception
{
runTest(3);
}
private static void runTest(final int capacity) throws Exception
{
final int nTasks = (capacity + 1) * 3;
final ExecutorService blockingExecutor = Execs.newBlockingSingleThreaded("ExecsTest-Blocking-%d", capacity);
final CountDownLatch queueShouldBeFullSignal = new CountDownLatch(capacity + 1);
final CountDownLatch taskCompletedSignal = new CountDownLatch(nTasks);
final CountDownLatch taskStartSignal = new CountDownLatch(1);
final AtomicInteger producedCount = new AtomicInteger();
final AtomicInteger consumedCount = new AtomicInteger();
ExecutorService producer = Executors.newSingleThreadExecutor();
final ExecutorService producer = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(
"ExecsTest-Producer-%d"
).build()
);
producer.submit(
new Runnable()
{
public void run()
{
for (int i = 0; i < 2 * capacity; i++) {
for (int i = 0; i < nTasks; i++) {
final int taskID = i;
System.out.println("Produced task" + taskID);
blockingExecutor.submit(
@ -55,7 +80,7 @@ public class ExecsTest
@Override
public void run()
{
System.out.println("Starting task" + taskID);
log.info("Starting task: %s", taskID);
try {
taskStartSignal.await();
consumedCount.incrementAndGet();
@ -64,29 +89,31 @@ public class ExecsTest
catch (Exception e) {
throw Throwables.propagate(e);
}
System.out.println("Completed task" + taskID);
log.info("Completed task: %s", taskID);
}
}
);
producedCount.incrementAndGet();
queueFullSignal.countDown();
queueShouldBeFullSignal.countDown();
}
}
}
);
queueFullSignal.await();
// verify that the producer blocks
queueShouldBeFullSignal.await();
// Verify that the producer blocks. I don't think it's possible to be sure that the producer is blocking (since
// it could be doing nothing for any reason). But waiting a short period of time and checking that it hasn't done
// anything should hopefully be sufficient.
Thread.sleep(500);
Assert.assertEquals(capacity + 1, producedCount.get());
// let the tasks run
taskStartSignal.countDown();
// wait until all tasks complete
taskCompletedSignal.await();
// verify all tasks consumed
Assert.assertEquals(2 * capacity, consumedCount.get());
Assert.assertEquals(nTasks, consumedCount.get());
// cleanup
blockingExecutor.shutdown();
producer.shutdown();
}
}

View File

@ -82,3 +82,13 @@ All JavaScript functions must return numerical values.
"fnReset" : "function() { return 10; }"
}
```
### Complex aggregators
#### `hyperUnique` aggregator
`hyperUnique` uses [Hyperloglog](http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf) to compute the estimated cardinality of a dimension.
```json
{ "type" : "hyperUnique", "name" : <output_name>, "fieldName" : <metric_name> }
```

View File

@ -26,6 +26,11 @@ druid.service=broker
druid.port=8080
druid.zk.service.host=localhost
# Change these to make Druid faster
druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1
```
Production Configs

View File

@ -0,0 +1,25 @@
---
layout: doc_page
---
A data source is the Druid equivalent of a database table. However, a query can also masquerade as a data source, providing subquery-like functionality. Query data sources are currently only supported by [GroupBy](GroupByQuery.html) queries.
### Table Data Source
The table data source the most common type. It's represented by a string, or by the full structure:
```json
{
"type": "table",
"name": <string_value>
}
```
### Query Data Source
```json
{
"type": "query",
"query": {
"type": "groupBy",
...
}
}
```

View File

@ -19,13 +19,13 @@ Clone Druid and build it:
git clone https://github.com/metamx/druid.git druid
cd druid
git fetch --tags
git checkout druid-0.6.61
git checkout druid-0.6.69
./build.sh
```
### Downloading the DSK (Druid Standalone Kit)
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.61-bin.tar.gz) a stand-alone tarball and run it:
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.69-bin.tar.gz) a stand-alone tarball and run it:
``` bash
tar -xzf druid-services-0.X.X-bin.tar.gz

View File

@ -48,7 +48,7 @@ There are 9 main parts to a groupBy query:
|property|description|required?|
|--------|-----------|---------|
|queryType|This String should always be "groupBy"; this is the first thing Druid looks at to figure out how to interpret the query|yes|
|dataSource|A String defining the data source to query, very similar to a table in a relational database|yes|
|dataSource|A String defining the data source to query, very similar to a table in a relational database, or a [DataSource](DataSource.html) structure.|yes|
|dimensions|A JSON list of dimensions to do the groupBy over|yes|
|orderBy|See [OrderBy](OrderBy.html).|no|
|having|See [Having](Having.html).|no|

View File

@ -66,7 +66,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/indexer
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.61"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.69"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod
@ -115,7 +115,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/worker
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.61","io.druid.extensions:druid-kafka-seven:0.6.61"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.69","io.druid.extensions:druid-kafka-seven:0.6.69"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod

View File

@ -64,6 +64,31 @@ Example JavaScript aggregator:
"function": "function(delta, total) { return 100 * Math.abs(delta) / total; }"
}
```
### `hyperUniqueCardinality` post-aggregator
The hyperUniqueCardinality post aggregator is used to wrap a hyperUnique object such that it can be used in post aggregations.
```json
{ "type" : "hyperUniqueCardinality", "fieldName" : <the name field value of the hyperUnique aggregator>}
```
It can be used in a sample calculation as so:
```json
"aggregations" : [{
{"type" : "count", "name" : "rows"},
{"type" : "hyperUnique", "name" : "unique_users", "fieldName" : "uniques"}
}],
"postAggregations" : {
"type" : "arithmetic",
"name" : "average_users_per_row",
"fn" : "/",
"fields" : [
{ "type" : "hyperUniqueCardinality", "fieldName" : "unique_users" },
{ "type" : "fieldAccess", "name" : "rows", "fieldName" : "rows" }
]
}
```
### Example Usage
@ -98,4 +123,4 @@ The format of the query JSON is as follows:
...
}
```
```

View File

@ -27,7 +27,7 @@ druid.host=localhost
druid.service=realtime
druid.port=8083
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.61"]
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.69"]
druid.zk.service.host=localhost
@ -76,7 +76,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/realtime
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.61","io.druid.extensions:druid-kafka-seven:0.6.61"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.69","io.druid.extensions:druid-kafka-seven:0.6.69"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod

View File

@ -152,10 +152,7 @@ The indexing service can also run real-time tasks. These tasks effectively trans
"intermediatePersistPeriod": "PT10m"
},
"windowPeriod": "PT10m",
"segmentGranularity": "hour",
"rejectionPolicy": {
"type": "messageTime"
}
"segmentGranularity": "hour"
}
```

View File

@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
### Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.61-bin.tar.gz). Download this file to a directory of your choosing.
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.69-bin.tar.gz). Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
cd druid-services-0.6.61
cd druid-services-0.6.69
```
You should see a bunch of files:

View File

@ -160,13 +160,15 @@ You should be comfortable starting Druid nodes at this point. If not, it may be
"segmentGranularity": "hour",
"basePersistDirectory": "\/tmp\/realtime\/basePersist",
"rejectionPolicy": {
"type": "none"
"type": "test"
}
}
}
]
```
Note: This config uses a "test" rejection policy which will accept all events and timely hand off, however, we strongly recommend you do not use this in production. Using this rejection policy, segments for events for the same time range will be overridden.
3. Let's copy and paste some data into the Kafka console producer
```json

View File

@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes and external depende
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.61-bin.tar.gz)
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.69-bin.tar.gz)
and untar the contents within by issuing:
@ -149,7 +149,7 @@ druid.port=8081
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.61"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.69"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
@ -240,7 +240,7 @@ druid.port=8083
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.61","io.druid.extensions:druid-kafka-seven:0.6.61"]
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.69","io.druid.extensions:druid-kafka-seven:0.6.69"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop
@ -251,6 +251,9 @@ druid.publish.type=noop
# druid.db.connector.password=diurd
druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1
druid.monitoring.monitors=["io.druid.segment.realtime.RealtimeMetricsMonitor"]
```
Next Steps

View File

@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
h3. Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.61-bin.tar.gz)
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.69-bin.tar.gz)
Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
cd druid-services-0.6.61
cd druid-services-0.6.69
```
You should see a bunch of files:

View File

@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source.
h3. Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.61-bin.tar.gz.
We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.69-bin.tar.gz.
Download this bad boy to a directory of your choosing.
You can extract the awesomeness within by issuing:

View File

@ -53,7 +53,7 @@
"segmentGranularity": "hour",
"basePersistDirectory": "\/tmp\/realtime\/basePersist",
"rejectionPolicy": {
"type": "messageTime"
"type": "test"
}
}
}

View File

@ -43,6 +43,6 @@
"windowPeriod" : "PT5m",
"segmentGranularity":"hour",
"basePersistDirectory" : "/tmp/realtime/basePersist",
"rejectionPolicy": { "type": "messageTime" }
"rejectionPolicy": { "type": "test" }
}
}]

View File

@ -55,8 +55,6 @@ JAVA_ARGS="${JAVA_ARGS} -Ddruid.realtime.specFile=${SPEC_FILE}"
DRUID_CP=${EXAMPLE_LOC}
#For a pull
DRUID_CP=${DRUID_CP}:`ls ${SCRIPT_DIR}/../target/druid-examples-*-selfcontained.jar`
DRUID_CP=${DRUID_CP}:`ls ${SCRIPT_DIR}/../../services/target/druid-services-*-selfcontained.jar`
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/../config/realtime
#For the kit
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/lib/*

View File

@ -2,4 +2,8 @@ druid.host=localhost
druid.service=broker
druid.port=8080
druid.zk.service.host=localhost
druid.zk.service.host=localhost
# Change these to make Druid faster
druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1

View File

@ -4,7 +4,7 @@ druid.port=8081
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.61"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.69"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b

View File

@ -4,7 +4,7 @@ druid.port=8083
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.61","io.druid.extensions:druid-kafka-seven:0.6.61","io.druid.extensions:druid-rabbitmq:0.6.61"]
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.69","io.druid.extensions:druid-kafka-seven:0.6.69","io.druid.extensions:druid-rabbitmq:0.6.69"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop
@ -16,3 +16,5 @@ druid.publish.type=noop
druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1
druid.monitoring.monitors=["io.druid.segment.realtime.RealtimeMetricsMonitor"]

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.63-SNAPSHOT</version>
<version>0.6.70-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.63-SNAPSHOT</version>
<version>0.6.70-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.63-SNAPSHOT</version>
<version>0.6.70-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.63-SNAPSHOT</version>
<version>0.6.70-SNAPSHOT</version>
</parent>
<dependencies>
@ -97,11 +97,6 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.clearspring.analytics</groupId>
<artifactId>stream</artifactId>
<version>2.5.2</version>
</dependency>
</dependencies>
<build>

View File

@ -19,8 +19,6 @@
package io.druid.indexer;
import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
import com.clearspring.analytics.stream.cardinality.HyperLogLog;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
@ -36,6 +34,7 @@ import io.druid.data.input.InputRow;
import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.NoneShardSpec;
import org.apache.hadoop.conf.Configuration;
@ -56,6 +55,7 @@ import org.joda.time.Interval;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -67,7 +67,6 @@ public class DetermineHashedPartitionsJob implements Jobby
{
private static final int MAX_SHARDS = 128;
private static final Logger log = new Logger(DetermineHashedPartitionsJob.class);
private static final int HYPER_LOG_LOG_BIT_SIZE = 20;
private final HadoopDruidIndexerConfig config;
public DetermineHashedPartitionsJob(
@ -99,8 +98,8 @@ public class DetermineHashedPartitionsJob implements Jobby
groupByJob.setOutputKeyClass(NullWritable.class);
groupByJob.setOutputValueClass(NullWritable.class);
groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class);
if(!config.getSegmentGranularIntervals().isPresent()){
groupByJob.setNumReduceTasks(1);
if (!config.getSegmentGranularIntervals().isPresent()) {
groupByJob.setNumReduceTasks(1);
}
JobHelper.setupClasspath(config, groupByJob);
@ -201,7 +200,7 @@ public class DetermineHashedPartitionsJob implements Jobby
{
private static HashFunction hashFunction = Hashing.murmur3_128();
private QueryGranularity rollupGranularity = null;
private Map<Interval, HyperLogLog> hyperLogLogs;
private Map<Interval, HyperLogLogCollector> hyperLogLogs;
private HadoopDruidIndexerConfig config;
private boolean determineIntervals;
@ -215,9 +214,9 @@ public class DetermineHashedPartitionsJob implements Jobby
Optional<Set<Interval>> intervals = config.getSegmentGranularIntervals();
if (intervals.isPresent()) {
determineIntervals = false;
final ImmutableMap.Builder<Interval, HyperLogLog> builder = ImmutableMap.builder();
final ImmutableMap.Builder<Interval, HyperLogLogCollector> builder = ImmutableMap.builder();
for (final Interval bucketInterval : intervals.get()) {
builder.put(bucketInterval, new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE));
builder.put(bucketInterval, HyperLogLogCollector.makeLatestCollector());
}
hyperLogLogs = builder.build();
} else {
@ -245,7 +244,7 @@ public class DetermineHashedPartitionsJob implements Jobby
.bucket(new DateTime(inputRow.getTimestampFromEpoch()));
if (!hyperLogLogs.containsKey(interval)) {
hyperLogLogs.put(interval, new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE));
hyperLogLogs.put(interval, HyperLogLogCollector.makeLatestCollector());
}
} else {
final Optional<Interval> maybeInterval = config.getGranularitySpec()
@ -257,9 +256,9 @@ public class DetermineHashedPartitionsJob implements Jobby
interval = maybeInterval.get();
}
hyperLogLogs.get(interval)
.offerHashed(
.add(
hashFunction.hashBytes(HadoopDruidIndexerConfig.jsonMapper.writeValueAsBytes(groupKey))
.asLong()
.asBytes()
);
}
@ -272,10 +271,10 @@ public class DetermineHashedPartitionsJob implements Jobby
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
for (Map.Entry<Interval, HyperLogLog> entry : hyperLogLogs.entrySet()) {
for (Map.Entry<Interval, HyperLogLogCollector> entry : hyperLogLogs.entrySet()) {
context.write(
new LongWritable(entry.getKey().getStartMillis()),
new BytesWritable(entry.getValue().getBytes())
new BytesWritable(entry.getValue().toByteArray())
);
}
cleanup(context);
@ -303,15 +302,9 @@ public class DetermineHashedPartitionsJob implements Jobby
Context context
) throws IOException, InterruptedException
{
HyperLogLog aggregate = new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE);
HyperLogLogCollector aggregate = HyperLogLogCollector.makeLatestCollector();
for (BytesWritable value : values) {
HyperLogLog logValue = HyperLogLog.Builder.build(getDataBytes(value));
try {
aggregate.addAll(logValue);
}
catch (CardinalityMergeException e) {
e.printStackTrace(); // TODO: check for better handling
}
aggregate.fold(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()));
}
Interval interval = config.getGranularitySpec().getSegmentGranularity().bucket(new DateTime(key.get()));
intervals.add(interval);
@ -327,7 +320,7 @@ public class DetermineHashedPartitionsJob implements Jobby
}
).writeValue(
out,
aggregate.cardinality()
new Double(aggregate.estimateCardinality()).longValue()
);
}
finally {

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
import io.druid.indexer.partitions.HashedPartitionsSpec;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.segment.indexing.DriverConfig;
@ -37,7 +36,6 @@ import java.util.Map;
@JsonTypeName("hadoop")
public class HadoopDriverConfig implements DriverConfig
{
private static final String defaultWorkingPath = Files.createTempDir().getPath();
private static final PartitionsSpec defaultPartitionsSpec = HashedPartitionsSpec.makeDefaultHashedPartitionsSpec();
private static final Map<DateTime, List<HadoopyShardSpec>> defaultShardSpecs = ImmutableMap.<DateTime, List<HadoopyShardSpec>>of();
private static final int defaultRowFlushBoundary = 80000;
@ -45,7 +43,7 @@ public class HadoopDriverConfig implements DriverConfig
public static HadoopDriverConfig makeDefaultDriverConfig()
{
return new HadoopDriverConfig(
defaultWorkingPath,
null,
new DateTime().toString(),
defaultPartitionsSpec,
defaultShardSpecs,
@ -80,7 +78,7 @@ public class HadoopDriverConfig implements DriverConfig
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows
)
{
this.workingPath = workingPath == null ? defaultWorkingPath : workingPath;
this.workingPath = workingPath == null ? null : workingPath;
this.version = version == null ? new DateTime().toString() : version;
this.partitionsSpec = partitionsSpec == null ? defaultPartitionsSpec : partitionsSpec;
this.shardSpecs = shardSpecs == null ? defaultShardSpecs : shardSpecs;

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.63-SNAPSHOT</version>
<version>0.6.70-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -283,7 +283,7 @@ public class HadoopIndexTask extends AbstractTask
Jobby job = new HadoopDruidDetermineConfigurationJob(config);
log.info("Starting a hadoop index generator job...");
log.info("Starting a hadoop determine configuration job...");
if (job.run()) {
return HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(config.getSchema());
}

View File

@ -90,7 +90,7 @@ public class IndexerDBCoordinator
final ResultIterator<Map<String, Object>> dbSegments =
handle.createQuery(
String.format(
"SELECT payload FROM %s WHERE used = 1 AND dataSource = :dataSource",
"SELECT payload FROM %s WHERE used = true AND dataSource = :dataSource",
dbTables.getSegmentsTable()
)
)
@ -304,8 +304,8 @@ public class IndexerDBCoordinator
return handle.createQuery(
String.format(
DbConnector.isPostgreSQL(handle)?
"SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and \"end\" <= :end and used = 0":
"SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and end <= :end and used = 0",
"SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and \"end\" <= :end and used = false":
"SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and end <= :end and used = false",
dbTables.getSegmentsTable()
)
)

View File

@ -38,6 +38,7 @@ import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.task.Task;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
import io.druid.query.TableDataSource;
import io.druid.query.QueryRunner;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.SegmentDescriptor;
@ -152,10 +153,17 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
private <T> QueryRunner<T> getQueryRunnerImpl(Query<T> query)
{
QueryRunner<T> queryRunner = null;
String queryDataSource;
try {
queryDataSource = ((TableDataSource)query.getDataSource()).getName();
}
catch (ClassCastException e) {
throw new IllegalArgumentException("Subqueries are not welcome here");
}
for (final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem : ImmutableList.copyOf(runningItems)) {
final Task task = taskRunnerWorkItem.getTask();
if (task.getDataSource().equals(query.getDataSource())) {
if (task.getDataSource().equals(queryDataSource)) {
final QueryRunner<T> taskQueryRunner = task.getQueryRunner(query);
if (taskQueryRunner != null) {
@ -163,7 +171,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
queryRunner = taskQueryRunner;
} else {
log.makeAlert("Found too many query runners for datasource")
.addData("dataSource", query.getDataSource())
.addData("dataSource", queryDataSource)
.emit();
}
}

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.63-SNAPSHOT</version>
<version>0.6.70-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.63-SNAPSHOT</version>
<version>0.6.70-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -23,14 +23,14 @@
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<packaging>pom</packaging>
<version>0.6.63-SNAPSHOT</version>
<version>0.6.70-SNAPSHOT</version>
<name>druid</name>
<description>druid</description>
<scm>
<connection>scm:git:ssh://git@github.com/metamx/druid.git</connection>
<developerConnection>scm:git:ssh://git@github.com/metamx/druid.git</developerConnection>
<url>http://www.github.com/metamx/druid</url>
<tag>${project.artifactId}-${project.version}</tag>
<tag>druid-0.6.69-SNAPSHOT</tag>
</scm>
<prerequisites>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.63-SNAPSHOT</version>
<version>0.6.70-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -22,6 +22,7 @@ package io.druid.jackson;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.hash.Hashing;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
@ -31,10 +32,14 @@ import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.MaxAggregatorFactory;
import io.druid.query.aggregation.MinAggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
import io.druid.query.aggregation.post.ConstantPostAggregator;
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
import io.druid.query.aggregation.post.JavaScriptPostAggregator;
import io.druid.segment.serde.ComplexMetrics;
/**
*/
@ -44,28 +49,38 @@ public class AggregatorsModule extends SimpleModule
{
super("AggregatorFactories");
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
}
setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class);
setMixInAnnotation(PostAggregator.class, PostAggregatorMixin.class);
}
@JsonTypeInfo(use= JsonTypeInfo.Id.NAME, property="type")
@JsonSubTypes(value={
@JsonSubTypes.Type(name="count", value=CountAggregatorFactory.class),
@JsonSubTypes.Type(name="longSum", value=LongSumAggregatorFactory.class),
@JsonSubTypes.Type(name="doubleSum", value=DoubleSumAggregatorFactory.class),
@JsonSubTypes.Type(name="max", value=MaxAggregatorFactory.class),
@JsonSubTypes.Type(name="min", value=MinAggregatorFactory.class),
@JsonSubTypes.Type(name="javascript", value=JavaScriptAggregatorFactory.class),
@JsonSubTypes.Type(name="histogram", value=HistogramAggregatorFactory.class)
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "count", value = CountAggregatorFactory.class),
@JsonSubTypes.Type(name = "longSum", value = LongSumAggregatorFactory.class),
@JsonSubTypes.Type(name = "doubleSum", value = DoubleSumAggregatorFactory.class),
@JsonSubTypes.Type(name = "max", value = MaxAggregatorFactory.class),
@JsonSubTypes.Type(name = "min", value = MinAggregatorFactory.class),
@JsonSubTypes.Type(name = "javascript", value = JavaScriptAggregatorFactory.class),
@JsonSubTypes.Type(name = "histogram", value = HistogramAggregatorFactory.class),
@JsonSubTypes.Type(name = "hyperUnique", value = HyperUniquesAggregatorFactory.class)
})
public static interface AggregatorFactoryMixin {}
public static interface AggregatorFactoryMixin
{
}
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "arithmetic", value = ArithmeticPostAggregator.class),
@JsonSubTypes.Type(name = "fieldAccess", value = FieldAccessPostAggregator.class),
@JsonSubTypes.Type(name = "constant", value = ConstantPostAggregator.class),
@JsonSubTypes.Type(name = "javascript", value = JavaScriptPostAggregator.class)
@JsonSubTypes.Type(name = "javascript", value = JavaScriptPostAggregator.class),
@JsonSubTypes.Type(name = "hyperUniqueCardinality", value = HyperUniqueFinalizingPostAggregator.class)
})
public static interface PostAggregatorMixin {}
public static interface PostAggregatorMixin
{
}
}

View File

@ -36,13 +36,13 @@ import java.util.Map;
public abstract class BaseQuery<T> implements Query<T>
{
public static String QUERYID = "queryId";
private final String dataSource;
private final DataSource dataSource;
private final Map<String, String> context;
private final QuerySegmentSpec querySegmentSpec;
private volatile Duration duration;
public BaseQuery(
String dataSource,
DataSource dataSource,
QuerySegmentSpec querySegmentSpec,
Map<String, String> context
)
@ -50,14 +50,14 @@ public abstract class BaseQuery<T> implements Query<T>
Preconditions.checkNotNull(dataSource, "dataSource can't be null");
Preconditions.checkNotNull(querySegmentSpec, "querySegmentSpec can't be null");
this.dataSource = dataSource.toLowerCase();
this.dataSource = dataSource;
this.context = context;
this.querySegmentSpec = querySegmentSpec;
}
@JsonProperty
@Override
public String getDataSource()
public DataSource getDataSource()
{
return dataSource;
}
@ -143,4 +143,31 @@ public abstract class BaseQuery<T> implements Query<T>
{
return withOverriddenContext(ImmutableMap.of(QUERYID, id));
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
BaseQuery baseQuery = (BaseQuery) o;
if (context != null ? !context.equals(baseQuery.context) : baseQuery.context != null) return false;
if (dataSource != null ? !dataSource.equals(baseQuery.dataSource) : baseQuery.dataSource != null) return false;
if (duration != null ? !duration.equals(baseQuery.duration) : baseQuery.duration != null) return false;
if (querySegmentSpec != null ? !querySegmentSpec.equals(baseQuery.querySegmentSpec) : baseQuery.querySegmentSpec != null)
return false;
return true;
}
@Override
public int hashCode()
{
int result = dataSource != null ? dataSource.hashCode() : 0;
result = 31 * result + (context != null ? context.hashCode() : 0);
result = 31 * result + (querySegmentSpec != null ? querySegmentSpec.hashCode() : 0);
result = 31 * result + (duration != null ? duration.hashCode() : 0);
return result;
}
}

View File

@ -25,6 +25,7 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.metamx.common.ISE;
import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.MergeIterable;
import com.metamx.common.guava.Sequence;
@ -84,11 +85,6 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
{
final int priority = Integer.parseInt(query.getContextValue("priority", "0"));
if (Iterables.isEmpty(queryables)) {
log.warn("No queryables found.");
return Sequences.empty();
}
return new BaseSequence<T, Iterator<T>>(
new BaseSequence.IteratorMaker<T, Iterator<T>>()
{
@ -111,6 +107,9 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
public List<T> call() throws Exception
{
try {
if (input == null) {
throw new ISE("Input is null?! How is this possible?!");
}
return Sequences.toList(input.run(query), Lists.<T>newArrayList());
}
catch (Exception e) {

View File

@ -0,0 +1,38 @@
/*
* Druid - a distributed column store.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project
* under the Druid Corporate Contributor License Agreement.
*/
package io.druid.query;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
property = "type",
defaultImpl = LegacyDataSource.class)
@JsonSubTypes({
@JsonSubTypes.Type(value = TableDataSource.class, name = "table"),
@JsonSubTypes.Type(value = QueryDataSource.class, name = "query")
})
public interface DataSource
{
public String getName();
}

View File

@ -298,7 +298,7 @@ public class Druids
*/
public static class TimeseriesQueryBuilder
{
private String dataSource;
private DataSource dataSource;
private QuerySegmentSpec querySegmentSpec;
private DimFilter dimFilter;
private QueryGranularity granularity;
@ -308,7 +308,7 @@ public class Druids
private TimeseriesQueryBuilder()
{
dataSource = "";
dataSource = null;
querySegmentSpec = null;
dimFilter = null;
granularity = QueryGranularity.ALL;
@ -354,7 +354,7 @@ public class Druids
.context(builder.context);
}
public String getDataSource()
public DataSource getDataSource()
{
return dataSource;
}
@ -390,6 +390,12 @@ public class Druids
}
public TimeseriesQueryBuilder dataSource(String ds)
{
dataSource = new TableDataSource(ds);
return this;
}
public TimeseriesQueryBuilder dataSource(DataSource ds)
{
dataSource = ds;
return this;
@ -492,7 +498,7 @@ public class Druids
*/
public static class SearchQueryBuilder
{
private String dataSource;
private DataSource dataSource;
private DimFilter dimFilter;
private QueryGranularity granularity;
private int limit;
@ -503,7 +509,7 @@ public class Druids
public SearchQueryBuilder()
{
dataSource = "";
dataSource = null;
dimFilter = null;
granularity = QueryGranularity.ALL;
limit = 0;
@ -531,7 +537,7 @@ public class Druids
public SearchQueryBuilder copy(SearchQuery query)
{
return new SearchQueryBuilder()
.dataSource(query.getDataSource())
.dataSource(((TableDataSource)query.getDataSource()).getName())
.intervals(query.getQuerySegmentSpec())
.filters(query.getDimensionsFilter())
.granularity(query.getGranularity())
@ -555,6 +561,12 @@ public class Druids
}
public SearchQueryBuilder dataSource(String d)
{
dataSource = new TableDataSource(d);
return this;
}
public SearchQueryBuilder dataSource(DataSource d)
{
dataSource = d;
return this;
@ -676,13 +688,13 @@ public class Druids
*/
public static class TimeBoundaryQueryBuilder
{
private String dataSource;
private DataSource dataSource;
private QuerySegmentSpec querySegmentSpec;
private Map<String, String> context;
public TimeBoundaryQueryBuilder()
{
dataSource = "";
dataSource = null;
querySegmentSpec = null;
context = null;
}
@ -704,9 +716,15 @@ public class Druids
.context(builder.context);
}
public TimeBoundaryQueryBuilder dataSource(String d)
public TimeBoundaryQueryBuilder dataSource(String ds)
{
dataSource = d;
dataSource = new TableDataSource(ds);
return this;
}
public TimeBoundaryQueryBuilder dataSource(DataSource ds)
{
dataSource = ds;
return this;
}

View File

@ -0,0 +1,35 @@
/*
* Druid - a distributed column store.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project
* under the Druid Corporate Contributor License Agreement.
*/
package io.druid.query;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
@JsonTypeName("table")
public class LegacyDataSource extends TableDataSource
{
@JsonCreator
public LegacyDataSource(String name)
{
super(name);
}
}

View File

@ -56,7 +56,7 @@ public interface Query<T>
public static final String SELECT = "select";
public static final String TOPN = "topN";
public String getDataSource();
public DataSource getDataSource();
public boolean hasFilters();

View File

@ -0,0 +1,78 @@
/*
* Druid - a distributed column store.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project
* under the Druid Corporate Contributor License Agreement.
*/
package io.druid.query;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
@JsonTypeName("query")
public class QueryDataSource implements DataSource
{
@JsonProperty
private final Query query;
@JsonCreator
public QueryDataSource(@JsonProperty("query") Query query)
{
this.query = query;
}
@Override
public String getName()
{
return query.getDataSource().getName();
}
@JsonProperty
public Query getQuery()
{
return query;
}
public String toString() { return query.toString(); }
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
QueryDataSource that = (QueryDataSource) o;
if (!query.equals(that.query)) {
return false;
}
return true;
}
@Override
public int hashCode()
{
return query.hashCode();
}
}

View File

@ -0,0 +1,48 @@
/*
* Druid - a distributed column store.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project
* under the Druid Corporate Contributor License Agreement.
*/
package io.druid.query;
import com.metamx.common.guava.Sequence;
/**
* If there's a subquery, run it instead of the outer query
*/
public class SubqueryQueryRunner<T> implements QueryRunner<T>
{
private final QueryRunner<T> baseRunner;
public SubqueryQueryRunner(QueryRunner<T> baseRunner)
{
this.baseRunner = baseRunner;
}
@Override
public Sequence<T> run(final Query<T> query)
{
DataSource dataSource = query.getDataSource();
if (dataSource instanceof QueryDataSource) {
return run((Query<T>) ((QueryDataSource) dataSource).getQuery());
} else {
return baseRunner.run(query);
}
}
}

View File

@ -0,0 +1,72 @@
/*
* Druid - a distributed column store.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project
* under the Druid Corporate Contributor License Agreement.
*/
package io.druid.query;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
@JsonTypeName("table")
public class TableDataSource implements DataSource
{
@JsonProperty
private final String name;
@JsonCreator
public TableDataSource(@JsonProperty("name") String name)
{
this.name = (name == null ? null : name.toLowerCase());
}
@JsonProperty
@Override
public String getName()
{
return name;
}
public String toString() { return name; }
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (!(o instanceof TableDataSource)) {
return false;
}
TableDataSource that = (TableDataSource) o;
if (!name.equals(that.name)) {
return false;
}
return true;
}
@Override
public int hashCode()
{
return name.hashCode();
}
}

View File

@ -132,4 +132,23 @@ public class CountAggregatorFactory implements AggregatorFactory
"name='" + name + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CountAggregatorFactory that = (CountAggregatorFactory) o;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
return true;
}
@Override
public int hashCode()
{
return name != null ? name.hashCode() : 0;
}
}

View File

@ -150,4 +150,26 @@ public class DoubleSumAggregatorFactory implements AggregatorFactory
", name='" + name + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DoubleSumAggregatorFactory that = (DoubleSumAggregatorFactory) o;
if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = fieldName != null ? fieldName.hashCode() : 0;
result = 31 * result + (name != null ? name.hashCode() : 0);
return result;
}
}

View File

@ -179,4 +179,30 @@ public class HistogramAggregatorFactory implements AggregatorFactory
", breaks=" + Arrays.toString(breaks) +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
HistogramAggregatorFactory that = (HistogramAggregatorFactory) o;
if (!Arrays.equals(breaks, that.breaks)) return false;
if (breaksList != null ? !breaksList.equals(that.breaksList) : that.breaksList != null) return false;
if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = name != null ? name.hashCode() : 0;
result = 31 * result + (fieldName != null ? fieldName.hashCode() : 0);
result = 31 * result + (breaksList != null ? breaksList.hashCode() : 0);
result = 31 * result + (breaks != null ? Arrays.hashCode(breaks) : 0);
return result;
}
}

View File

@ -317,4 +317,35 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
}
};
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
JavaScriptAggregatorFactory that = (JavaScriptAggregatorFactory) o;
if (compiledScript != null ? !compiledScript.equals(that.compiledScript) : that.compiledScript != null)
return false;
if (fieldNames != null ? !fieldNames.equals(that.fieldNames) : that.fieldNames != null) return false;
if (fnAggregate != null ? !fnAggregate.equals(that.fnAggregate) : that.fnAggregate != null) return false;
if (fnCombine != null ? !fnCombine.equals(that.fnCombine) : that.fnCombine != null) return false;
if (fnReset != null ? !fnReset.equals(that.fnReset) : that.fnReset != null) return false;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = name != null ? name.hashCode() : 0;
result = 31 * result + (fieldNames != null ? fieldNames.hashCode() : 0);
result = 31 * result + (fnAggregate != null ? fnAggregate.hashCode() : 0);
result = 31 * result + (fnReset != null ? fnReset.hashCode() : 0);
result = 31 * result + (fnCombine != null ? fnCombine.hashCode() : 0);
result = 31 * result + (compiledScript != null ? compiledScript.hashCode() : 0);
return result;
}
}

View File

@ -150,4 +150,26 @@ public class LongSumAggregatorFactory implements AggregatorFactory
", name='" + name + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
LongSumAggregatorFactory that = (LongSumAggregatorFactory) o;
if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = fieldName != null ? fieldName.hashCode() : 0;
result = 31 * result + (name != null ? name.hashCode() : 0);
return result;
}
}

View File

@ -150,4 +150,26 @@ public class MaxAggregatorFactory implements AggregatorFactory
", name='" + name + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MaxAggregatorFactory that = (MaxAggregatorFactory) o;
if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = fieldName != null ? fieldName.hashCode() : 0;
result = 31 * result + (name != null ? name.hashCode() : 0);
return result;
}
}

View File

@ -150,4 +150,26 @@ public class MinAggregatorFactory implements AggregatorFactory
", name='" + name + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MinAggregatorFactory that = (MinAggregatorFactory) o;
if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = fieldName != null ? fieldName.hashCode() : 0;
result = 31 * result + (name != null ? name.hashCode() : 0);
return result;
}
}

View File

@ -112,4 +112,24 @@ public class ToLowerCaseAggregatorFactory implements AggregatorFactory
{
return baseAggregatorFactory.getAggregatorStartValue();
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ToLowerCaseAggregatorFactory that = (ToLowerCaseAggregatorFactory) o;
if (baseAggregatorFactory != null ? !baseAggregatorFactory.equals(that.baseAggregatorFactory) : that.baseAggregatorFactory != null)
return false;
return true;
}
@Override
public int hashCode()
{
return baseAggregatorFactory != null ? baseAggregatorFactory.hashCode() : 0;
}
}

View File

@ -0,0 +1,288 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation.hyperloglog;
/**
*/
public class ByteBitLookup
{
public static final byte[] lookup;
static {
lookup = new byte[256];
lookup[0] = 0;
lookup[1] = 1;
lookup[2] = 2;
lookup[3] = 1;
lookup[4] = 3;
lookup[5] = 1;
lookup[6] = 2;
lookup[7] = 1;
lookup[8] = 4;
lookup[9] = 1;
lookup[10] = 2;
lookup[11] = 1;
lookup[12] = 3;
lookup[13] = 1;
lookup[14] = 2;
lookup[15] = 1;
lookup[16] = 5;
lookup[17] = 1;
lookup[18] = 2;
lookup[19] = 1;
lookup[20] = 3;
lookup[21] = 1;
lookup[22] = 2;
lookup[23] = 1;
lookup[24] = 4;
lookup[25] = 1;
lookup[26] = 2;
lookup[27] = 1;
lookup[28] = 3;
lookup[29] = 1;
lookup[30] = 2;
lookup[31] = 1;
lookup[32] = 6;
lookup[33] = 1;
lookup[34] = 2;
lookup[35] = 1;
lookup[36] = 3;
lookup[37] = 1;
lookup[38] = 2;
lookup[39] = 1;
lookup[40] = 4;
lookup[41] = 1;
lookup[42] = 2;
lookup[43] = 1;
lookup[44] = 3;
lookup[45] = 1;
lookup[46] = 2;
lookup[47] = 1;
lookup[48] = 5;
lookup[49] = 1;
lookup[50] = 2;
lookup[51] = 1;
lookup[52] = 3;
lookup[53] = 1;
lookup[54] = 2;
lookup[55] = 1;
lookup[56] = 4;
lookup[57] = 1;
lookup[58] = 2;
lookup[59] = 1;
lookup[60] = 3;
lookup[61] = 1;
lookup[62] = 2;
lookup[63] = 1;
lookup[64] = 7;
lookup[65] = 1;
lookup[66] = 2;
lookup[67] = 1;
lookup[68] = 3;
lookup[69] = 1;
lookup[70] = 2;
lookup[71] = 1;
lookup[72] = 4;
lookup[73] = 1;
lookup[74] = 2;
lookup[75] = 1;
lookup[76] = 3;
lookup[77] = 1;
lookup[78] = 2;
lookup[79] = 1;
lookup[80] = 5;
lookup[81] = 1;
lookup[82] = 2;
lookup[83] = 1;
lookup[84] = 3;
lookup[85] = 1;
lookup[86] = 2;
lookup[87] = 1;
lookup[88] = 4;
lookup[89] = 1;
lookup[90] = 2;
lookup[91] = 1;
lookup[92] = 3;
lookup[93] = 1;
lookup[94] = 2;
lookup[95] = 1;
lookup[96] = 6;
lookup[97] = 1;
lookup[98] = 2;
lookup[99] = 1;
lookup[100] = 3;
lookup[101] = 1;
lookup[102] = 2;
lookup[103] = 1;
lookup[104] = 4;
lookup[105] = 1;
lookup[106] = 2;
lookup[107] = 1;
lookup[108] = 3;
lookup[109] = 1;
lookup[110] = 2;
lookup[111] = 1;
lookup[112] = 5;
lookup[113] = 1;
lookup[114] = 2;
lookup[115] = 1;
lookup[116] = 3;
lookup[117] = 1;
lookup[118] = 2;
lookup[119] = 1;
lookup[120] = 4;
lookup[121] = 1;
lookup[122] = 2;
lookup[123] = 1;
lookup[124] = 3;
lookup[125] = 1;
lookup[126] = 2;
lookup[127] = 1;
lookup[128] = 8;
lookup[129] = 1;
lookup[130] = 2;
lookup[131] = 1;
lookup[132] = 3;
lookup[133] = 1;
lookup[134] = 2;
lookup[135] = 1;
lookup[136] = 4;
lookup[137] = 1;
lookup[138] = 2;
lookup[139] = 1;
lookup[140] = 3;
lookup[141] = 1;
lookup[142] = 2;
lookup[143] = 1;
lookup[144] = 5;
lookup[145] = 1;
lookup[146] = 2;
lookup[147] = 1;
lookup[148] = 3;
lookup[149] = 1;
lookup[150] = 2;
lookup[151] = 1;
lookup[152] = 4;
lookup[153] = 1;
lookup[154] = 2;
lookup[155] = 1;
lookup[156] = 3;
lookup[157] = 1;
lookup[158] = 2;
lookup[159] = 1;
lookup[160] = 6;
lookup[161] = 1;
lookup[162] = 2;
lookup[163] = 1;
lookup[164] = 3;
lookup[165] = 1;
lookup[166] = 2;
lookup[167] = 1;
lookup[168] = 4;
lookup[169] = 1;
lookup[170] = 2;
lookup[171] = 1;
lookup[172] = 3;
lookup[173] = 1;
lookup[174] = 2;
lookup[175] = 1;
lookup[176] = 5;
lookup[177] = 1;
lookup[178] = 2;
lookup[179] = 1;
lookup[180] = 3;
lookup[181] = 1;
lookup[182] = 2;
lookup[183] = 1;
lookup[184] = 4;
lookup[185] = 1;
lookup[186] = 2;
lookup[187] = 1;
lookup[188] = 3;
lookup[189] = 1;
lookup[190] = 2;
lookup[191] = 1;
lookup[192] = 7;
lookup[193] = 1;
lookup[194] = 2;
lookup[195] = 1;
lookup[196] = 3;
lookup[197] = 1;
lookup[198] = 2;
lookup[199] = 1;
lookup[200] = 4;
lookup[201] = 1;
lookup[202] = 2;
lookup[203] = 1;
lookup[204] = 3;
lookup[205] = 1;
lookup[206] = 2;
lookup[207] = 1;
lookup[208] = 5;
lookup[209] = 1;
lookup[210] = 2;
lookup[211] = 1;
lookup[212] = 3;
lookup[213] = 1;
lookup[214] = 2;
lookup[215] = 1;
lookup[216] = 4;
lookup[217] = 1;
lookup[218] = 2;
lookup[219] = 1;
lookup[220] = 3;
lookup[221] = 1;
lookup[222] = 2;
lookup[223] = 1;
lookup[224] = 6;
lookup[225] = 1;
lookup[226] = 2;
lookup[227] = 1;
lookup[228] = 3;
lookup[229] = 1;
lookup[230] = 2;
lookup[231] = 1;
lookup[232] = 4;
lookup[233] = 1;
lookup[234] = 2;
lookup[235] = 1;
lookup[236] = 3;
lookup[237] = 1;
lookup[238] = 2;
lookup[239] = 1;
lookup[240] = 5;
lookup[241] = 1;
lookup[242] = 2;
lookup[243] = 1;
lookup[244] = 3;
lookup[245] = 1;
lookup[246] = 2;
lookup[247] = 1;
lookup[248] = 4;
lookup[249] = 1;
lookup[250] = 2;
lookup[251] = 1;
lookup[252] = 3;
lookup[253] = 1;
lookup[254] = 2;
lookup[255] = 1;
}
}

View File

@ -0,0 +1,152 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation.hyperloglog;
import java.nio.ByteBuffer;
/**
*/
@Deprecated
public class HLLCV0 extends HyperLogLogCollector
{
/**
* Header:
* Byte 0: registerOffset
* Byte 1-2: numNonZeroRegisters
*/
public static final int NUM_NON_ZERO_REGISTERS_BYTE = 1;
public static final int HEADER_NUM_BYTES = 3;
public static final int NUM_BYTES_FOR_DENSE_STORAGE = NUM_BYTES_FOR_BUCKETS + HEADER_NUM_BYTES;
private static final ByteBuffer defaultStorageBuffer = ByteBuffer.wrap(new byte[]{0, 0, 0}).asReadOnlyBuffer();
public HLLCV0()
{
super(defaultStorageBuffer);
}
public HLLCV0(ByteBuffer buffer)
{
super(buffer);
}
@Override
public byte getVersion()
{
return 0;
}
@Override
public void setVersion(ByteBuffer buffer)
{
}
@Override
public byte getRegisterOffset()
{
return getStorageBuffer().get(getInitPosition());
}
@Override
public void setRegisterOffset(byte registerOffset)
{
getStorageBuffer().put(getInitPosition(), registerOffset);
}
@Override
public void setRegisterOffset(ByteBuffer buffer, byte registerOffset)
{
buffer.put(buffer.position(), registerOffset);
}
@Override
public short getNumNonZeroRegisters()
{
return getStorageBuffer().getShort(getInitPosition() + NUM_NON_ZERO_REGISTERS_BYTE);
}
@Override
public void setNumNonZeroRegisters(short numNonZeroRegisters)
{
getStorageBuffer().putShort(getInitPosition() + NUM_NON_ZERO_REGISTERS_BYTE, numNonZeroRegisters);
}
@Override
public void setNumNonZeroRegisters(ByteBuffer buffer, short numNonZeroRegisters)
{
buffer.putShort(buffer.position() + NUM_NON_ZERO_REGISTERS_BYTE, numNonZeroRegisters);
}
@Override
public byte getMaxOverflowValue()
{
return 0;
}
@Override
public void setMaxOverflowValue(byte value)
{
}
@Override
public void setMaxOverflowValue(ByteBuffer buffer, byte value)
{
}
@Override
public short getMaxOverflowRegister()
{
return 0;
}
@Override
public void setMaxOverflowRegister(short register)
{
}
@Override
public void setMaxOverflowRegister(ByteBuffer buffer, short register)
{
}
@Override
public int getNumHeaderBytes()
{
return HEADER_NUM_BYTES;
}
@Override
public int getNumBytesForDenseStorage()
{
return NUM_BYTES_FOR_DENSE_STORAGE;
}
@Override
public int getPayloadBytePosition()
{
return getInitPosition() + HEADER_NUM_BYTES;
}
@Override
public int getPayloadBytePosition(ByteBuffer buffer)
{
return buffer.position() + HEADER_NUM_BYTES;
}
}

View File

@ -0,0 +1,164 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation.hyperloglog;
import java.nio.ByteBuffer;
/**
*/
public class HLLCV1 extends HyperLogLogCollector
{
/**
* Header:
* Byte 0: version
* Byte 1: registerOffset
* Byte 2-3: numNonZeroRegisters
* Byte 4: maxOverflowValue
* Byte 5-6: maxOverflowRegister
*/
public static final byte VERSION = 0x1;
public static final int REGISTER_OFFSET_BYTE = 1;
public static final int NUM_NON_ZERO_REGISTERS_BYTE = 2;
public static final int MAX_OVERFLOW_VALUE_BYTE = 4;
public static final int MAX_OVERFLOW_REGISTER_BYTE = 5;
public static final int HEADER_NUM_BYTES = 7;
public static final int NUM_BYTES_FOR_DENSE_STORAGE = NUM_BYTES_FOR_BUCKETS + HEADER_NUM_BYTES;
private static final ByteBuffer defaultStorageBuffer = ByteBuffer.wrap(new byte[]{VERSION, 0, 0, 0, 0, 0, 0})
.asReadOnlyBuffer();
public HLLCV1()
{
super(defaultStorageBuffer);
}
public HLLCV1(ByteBuffer buffer)
{
super(buffer);
}
@Override
public byte getVersion()
{
return VERSION;
}
@Override
public void setVersion(ByteBuffer buffer)
{
buffer.put(buffer.position(), VERSION);
}
@Override
public byte getRegisterOffset()
{
return getStorageBuffer().get(getInitPosition() + REGISTER_OFFSET_BYTE);
}
@Override
public void setRegisterOffset(byte registerOffset)
{
getStorageBuffer().put(getInitPosition() + REGISTER_OFFSET_BYTE, registerOffset);
}
@Override
public void setRegisterOffset(ByteBuffer buffer, byte registerOffset)
{
buffer.put(buffer.position() + REGISTER_OFFSET_BYTE, registerOffset);
}
@Override
public short getNumNonZeroRegisters()
{
return getStorageBuffer().getShort(getInitPosition() + NUM_NON_ZERO_REGISTERS_BYTE);
}
@Override
public void setNumNonZeroRegisters(short numNonZeroRegisters)
{
getStorageBuffer().putShort(getInitPosition() + NUM_NON_ZERO_REGISTERS_BYTE, numNonZeroRegisters);
}
@Override
public void setNumNonZeroRegisters(ByteBuffer buffer, short numNonZeroRegisters)
{
buffer.putShort(buffer.position() + NUM_NON_ZERO_REGISTERS_BYTE, numNonZeroRegisters);
}
@Override
public byte getMaxOverflowValue()
{
return getStorageBuffer().get(getInitPosition() + MAX_OVERFLOW_VALUE_BYTE);
}
@Override
public void setMaxOverflowValue(byte value)
{
getStorageBuffer().put(getInitPosition() + MAX_OVERFLOW_VALUE_BYTE, value);
}
@Override
public void setMaxOverflowValue(ByteBuffer buffer, byte value)
{
buffer.put(buffer.position() + MAX_OVERFLOW_VALUE_BYTE, value);
}
@Override
public short getMaxOverflowRegister()
{
return getStorageBuffer().getShort(getInitPosition() + MAX_OVERFLOW_REGISTER_BYTE);
}
@Override
public void setMaxOverflowRegister(short register)
{
getStorageBuffer().putShort(getInitPosition() + MAX_OVERFLOW_REGISTER_BYTE, register);
}
@Override
public void setMaxOverflowRegister(ByteBuffer buffer, short register)
{
buffer.putShort(buffer.position() + MAX_OVERFLOW_REGISTER_BYTE, register);
}
@Override
public int getNumHeaderBytes()
{
return HEADER_NUM_BYTES;
}
@Override
public int getNumBytesForDenseStorage()
{
return NUM_BYTES_FOR_DENSE_STORAGE;
}
@Override
public int getPayloadBytePosition()
{
return getInitPosition() + HEADER_NUM_BYTES;
}
@Override
public int getPayloadBytePosition(ByteBuffer buffer)
{
return buffer.position() + HEADER_NUM_BYTES;
}
}

View File

@ -0,0 +1,673 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation.hyperloglog;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.primitives.UnsignedBytes;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import java.nio.ByteBuffer;
/**
* Implements the HyperLogLog cardinality estimator described in:
* <p/>
* http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf
* <p/>
* Run this code to see a simple indication of expected errors based on different m values:
* <p/>
* for (int i = 1; i < 20; ++i) {
* System.out.printf("i[%,d], val[%,d] => error[%f%%]%n", i, 2 << i, 104 / Math.sqrt(2 << i));
* }
* <p/>
* This class is *not* multi-threaded. It can be passed among threads, but it is written with the assumption that
* only one thread is ever calling methods on it.
* <p/>
* If you have multiple threads calling methods on this concurrently, I hope you manage to get correct behavior
*/
public abstract class HyperLogLogCollector implements Comparable<HyperLogLogCollector>
{
public static final int DENSE_THRESHOLD = 128;
public static final int BITS_FOR_BUCKETS = 11;
public static final int NUM_BUCKETS = 1 << BITS_FOR_BUCKETS;
public static final int NUM_BYTES_FOR_BUCKETS = NUM_BUCKETS / 2;
private static final double TWO_TO_THE_SIXTY_FOUR = Math.pow(2, 64);
private static final double ALPHA = 0.7213 / (1 + 1.079 / NUM_BUCKETS);
public static final double LOW_CORRECTION_THRESHOLD = (5 * NUM_BUCKETS) / 2.0d;
public static final double HIGH_CORRECTION_THRESHOLD = TWO_TO_THE_SIXTY_FOUR / 30.0d;
public static final double CORRECTION_PARAMETER = ALPHA * NUM_BUCKETS * NUM_BUCKETS;
private static final Logger log = new Logger(HyperLogLogCollector.class);
private static final int bucketMask = 0x7ff;
private static final int minBytesRequired = 10;
private static final int bitsPerBucket = 4;
private static final int range = (int) Math.pow(2, bitsPerBucket) - 1;
private final static double[][] minNumRegisterLookup = new double[64][256];
static {
for (int registerOffset = 0; registerOffset < 64; ++registerOffset) {
for (int register = 0; register < 256; ++register) {
final int upper = ((register & 0xf0) >> 4) + registerOffset;
final int lower = (register & 0x0f) + registerOffset;
minNumRegisterLookup[registerOffset][register] = 1.0d / Math.pow(2, upper) + 1.0d / Math.pow(2, lower);
}
}
}
// we have to keep track of the number of zeroes in each of the two halves of the byte register (0, 1, or 2)
private final static int[] numZeroLookup = new int[256];
static {
for (int i = 0; i < numZeroLookup.length; ++i) {
numZeroLookup[i] = (((i & 0xf0) == 0) ? 1 : 0) + (((i & 0x0f) == 0) ? 1 : 0);
}
}
// Methods to build the latest HLLC
public static HyperLogLogCollector makeLatestCollector()
{
return new HLLCV1();
}
public static HyperLogLogCollector makeCollector(ByteBuffer buffer)
{
int remaining = buffer.remaining();
return (remaining % 3 == 0 || remaining == 1027) ? new HLLCV0(buffer) : new HLLCV1(buffer);
}
public static int getLatestNumBytesForDenseStorage()
{
return HLLCV1.NUM_BYTES_FOR_DENSE_STORAGE;
}
public static byte[] makeEmptyVersionedByteArray()
{
byte[] arr = new byte[getLatestNumBytesForDenseStorage()];
arr[0] = HLLCV1.VERSION;
return arr;
}
public static double applyCorrection(double e, int zeroCount)
{
e = CORRECTION_PARAMETER / e;
if (e <= LOW_CORRECTION_THRESHOLD) {
return zeroCount == 0 ? e : NUM_BUCKETS * Math.log(NUM_BUCKETS / (double) zeroCount);
}
if (e > HIGH_CORRECTION_THRESHOLD) {
final double ratio = e / TWO_TO_THE_SIXTY_FOUR;
if (ratio >= 1) {
// handle very unlikely case that value is > 2^64
return Double.MAX_VALUE;
} else {
return -TWO_TO_THE_SIXTY_FOUR * Math.log(1 - ratio);
}
}
return e;
}
private static double estimateSparse(
final ByteBuffer buf,
final byte minNum,
final byte overflowValue,
final short overflowPosition,
final boolean isUpperNibble
)
{
final ByteBuffer copy = buf.asReadOnlyBuffer();
double e = 0.0d;
int zeroCount = NUM_BUCKETS - 2 * (buf.remaining() / 3);
while (copy.hasRemaining()) {
short position = copy.getShort();
final int register = (int) copy.get() & 0xff;
if (overflowValue != 0 && position == overflowPosition) {
int upperNibble = ((register & 0xf0) >>> bitsPerBucket) + minNum;
int lowerNibble = (register & 0x0f) + minNum;
if (isUpperNibble) {
upperNibble = Math.max(upperNibble, overflowValue);
} else {
lowerNibble = Math.max(lowerNibble, overflowValue);
}
e += 1.0d / Math.pow(2, upperNibble) + 1.0d / Math.pow(2, lowerNibble);
zeroCount += (((upperNibble & 0xf0) == 0) ? 1 : 0) + (((lowerNibble & 0x0f) == 0) ? 1 : 0);
} else {
e += minNumRegisterLookup[minNum][register];
zeroCount += numZeroLookup[register];
}
}
e += zeroCount;
return applyCorrection(e, zeroCount);
}
private static double estimateDense(
final ByteBuffer buf,
final byte minNum,
final byte overflowValue,
final short overflowPosition,
final boolean isUpperNibble
)
{
final ByteBuffer copy = buf.asReadOnlyBuffer();
double e = 0.0d;
int zeroCount = 0;
int position = 0;
while (copy.hasRemaining()) {
final int register = (int) copy.get() & 0xff;
if (overflowValue != 0 && position == overflowPosition) {
int upperNibble = ((register & 0xf0) >>> bitsPerBucket) + minNum;
int lowerNibble = (register & 0x0f) + minNum;
if (isUpperNibble) {
upperNibble = Math.max(upperNibble, overflowValue);
} else {
lowerNibble = Math.max(lowerNibble, overflowValue);
}
e += 1.0d / Math.pow(2, upperNibble) + 1.0d / Math.pow(2, lowerNibble);
zeroCount += (((upperNibble & 0xf0) == 0) ? 1 : 0) + (((lowerNibble & 0x0f) == 0) ? 1 : 0);
} else {
e += minNumRegisterLookup[minNum][register];
zeroCount += numZeroLookup[register];
}
position++;
}
return applyCorrection(e, zeroCount);
}
private static boolean isSparse(ByteBuffer buffer)
{
return buffer.remaining() != NUM_BYTES_FOR_BUCKETS;
}
private volatile ByteBuffer storageBuffer;
private volatile int initPosition;
private volatile Double estimatedCardinality;
public HyperLogLogCollector(ByteBuffer byteBuffer)
{
storageBuffer = byteBuffer.duplicate();
initPosition = byteBuffer.position();
estimatedCardinality = null;
}
public abstract byte getVersion();
public abstract void setVersion(ByteBuffer buffer);
public abstract byte getRegisterOffset();
public abstract void setRegisterOffset(byte registerOffset);
public abstract void setRegisterOffset(ByteBuffer buffer, byte registerOffset);
public abstract short getNumNonZeroRegisters();
public abstract void setNumNonZeroRegisters(short numNonZeroRegisters);
public abstract void setNumNonZeroRegisters(ByteBuffer buffer, short numNonZeroRegisters);
public abstract byte getMaxOverflowValue();
public abstract void setMaxOverflowValue(byte value);
public abstract void setMaxOverflowValue(ByteBuffer buffer, byte value);
public abstract short getMaxOverflowRegister();
public abstract void setMaxOverflowRegister(short register);
public abstract void setMaxOverflowRegister(ByteBuffer buffer, short register);
public abstract int getNumHeaderBytes();
public abstract int getNumBytesForDenseStorage();
public abstract int getPayloadBytePosition();
public abstract int getPayloadBytePosition(ByteBuffer buffer);
protected int getInitPosition()
{
return initPosition;
}
protected ByteBuffer getStorageBuffer()
{
return storageBuffer;
}
public void add(byte[] hashedValue)
{
if (hashedValue.length < minBytesRequired) {
throw new IAE("Insufficient bytes, need[%d] got [%d]", minBytesRequired, hashedValue.length);
}
estimatedCardinality = null;
final ByteBuffer buffer = ByteBuffer.wrap(hashedValue);
short bucket = (short) (buffer.getShort(hashedValue.length - 2) & bucketMask);
byte positionOf1 = 0;
for (int i = 0; i < 8; ++i) {
byte lookupVal = ByteBitLookup.lookup[UnsignedBytes.toInt(hashedValue[i])];
switch (lookupVal) {
case 0:
positionOf1 += 8;
continue;
default:
positionOf1 += lookupVal;
i = 8;
break;
}
}
add(bucket, positionOf1);
}
public void add(short bucket, byte positionOf1)
{
if (storageBuffer.isReadOnly()) {
convertToMutableByteBuffer();
}
byte registerOffset = getRegisterOffset();
// discard everything outside of the range we care about
if (positionOf1 <= registerOffset) {
return;
} else if (positionOf1 > (registerOffset + range)) {
byte currMax = getMaxOverflowValue();
if (positionOf1 > currMax) {
setMaxOverflowValue(positionOf1);
setMaxOverflowRegister(bucket);
}
return;
}
// whatever value we add must be stored in 4 bits
short numNonZeroRegisters = addNibbleRegister(bucket, (byte) ((0xff & positionOf1) - registerOffset));
setNumNonZeroRegisters(numNonZeroRegisters);
if (numNonZeroRegisters == NUM_BUCKETS) {
setRegisterOffset(++registerOffset);
setNumNonZeroRegisters(decrementBuckets());
}
}
public HyperLogLogCollector fold(HyperLogLogCollector other)
{
if (other == null || other.storageBuffer.remaining() == 0) {
return this;
}
if (storageBuffer.isReadOnly()) {
convertToMutableByteBuffer();
}
estimatedCardinality = null;
if (getRegisterOffset() < other.getRegisterOffset()) {
// "Swap" the buffers so that we are folding into the one with the higher offset
ByteBuffer newStorage = ByteBuffer.allocate(other.storageBuffer.remaining());
newStorage.put(other.storageBuffer.asReadOnlyBuffer());
newStorage.clear();
other.storageBuffer = storageBuffer;
other.initPosition = initPosition;
storageBuffer = newStorage;
initPosition = 0;
}
ByteBuffer otherBuffer = other.storageBuffer.asReadOnlyBuffer();
byte otherOffset = other.getRegisterOffset();
if (storageBuffer.remaining() != getNumBytesForDenseStorage()) {
convertToDenseStorage();
}
byte myOffset = getRegisterOffset();
short numNonZero = getNumNonZeroRegisters();
int offsetDiff = myOffset - otherOffset;
if (offsetDiff < 0) {
throw new ISE("offsetDiff[%d] < 0, shouldn't happen because of swap.", offsetDiff);
}
byte otherOverflowValue = other.getMaxOverflowValue();
short otherOverflowRegister = other.getMaxOverflowRegister();
add(otherOverflowRegister, otherOverflowValue);
int myPayloadStart = getPayloadBytePosition();
otherBuffer.position(other.getPayloadBytePosition());
if (isSparse(otherBuffer)) {
while (otherBuffer.hasRemaining()) {
short position = otherBuffer.getShort();
int payloadStartPosition = position - other.getNumHeaderBytes();
numNonZero += mergeAndStoreByteRegister(
myPayloadStart + payloadStartPosition,
offsetDiff,
otherBuffer.get()
);
if (numNonZero == NUM_BUCKETS) {
myOffset += 1;
numNonZero = decrementBuckets();
setRegisterOffset(myOffset);
setNumNonZeroRegisters(numNonZero);
offsetDiff = myOffset - otherOffset;
}
}
} else { // dense
int position = getPayloadBytePosition();
while (otherBuffer.hasRemaining()) {
numNonZero += mergeAndStoreByteRegister(
position,
offsetDiff,
otherBuffer.get()
);
if (numNonZero == NUM_BUCKETS) {
myOffset += 1;
numNonZero = decrementBuckets();
setRegisterOffset(myOffset);
setNumNonZeroRegisters(numNonZero);
offsetDiff = myOffset - otherOffset;
}
position++;
}
}
setRegisterOffset(myOffset);
setNumNonZeroRegisters(numNonZero);
return this;
}
public HyperLogLogCollector fold(ByteBuffer buffer)
{
return fold(makeCollector(buffer));
}
public ByteBuffer toByteBuffer()
{
short numNonZeroRegisters = getNumNonZeroRegisters();
// store sparsely
if (storageBuffer.remaining() == getNumBytesForDenseStorage() && numNonZeroRegisters < DENSE_THRESHOLD) {
ByteBuffer retVal = ByteBuffer.wrap(new byte[numNonZeroRegisters * 3 + getNumHeaderBytes()]);
setVersion(retVal);
setRegisterOffset(retVal, getRegisterOffset());
setNumNonZeroRegisters(retVal, numNonZeroRegisters);
setMaxOverflowValue(retVal, getMaxOverflowValue());
setMaxOverflowRegister(retVal, getMaxOverflowRegister());
int startPosition = getPayloadBytePosition();
retVal.position(getPayloadBytePosition(retVal));
for (int i = startPosition; i < startPosition + NUM_BYTES_FOR_BUCKETS; i++) {
if (storageBuffer.get(i) != 0) {
retVal.putShort((short) (0xffff & (i - initPosition)));
retVal.put(storageBuffer.get(i));
}
}
retVal.rewind();
return retVal.asReadOnlyBuffer();
}
return storageBuffer.asReadOnlyBuffer();
}
@JsonValue
public byte[] toByteArray()
{
final ByteBuffer buffer = toByteBuffer();
byte[] theBytes = new byte[buffer.remaining()];
buffer.get(theBytes);
return theBytes;
}
public double estimateCardinality()
{
if (estimatedCardinality == null) {
byte registerOffset = getRegisterOffset();
byte overflowValue = getMaxOverflowValue();
short overflowRegister = getMaxOverflowRegister();
short overflowPosition = (short) (overflowRegister >>> 1);
boolean isUpperNibble = ((overflowRegister & 0x1) == 0);
storageBuffer.position(getPayloadBytePosition());
if (isSparse(storageBuffer)) {
estimatedCardinality = estimateSparse(
storageBuffer,
registerOffset,
overflowValue,
overflowPosition,
isUpperNibble
);
} else {
estimatedCardinality = estimateDense(
storageBuffer,
registerOffset,
overflowValue,
overflowPosition,
isUpperNibble
);
}
storageBuffer.position(initPosition);
}
return estimatedCardinality;
}
public double estimateByteBuffer(ByteBuffer buf)
{
return makeCollector(buf).estimateCardinality();
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
HyperLogLogCollector collector = (HyperLogLogCollector) o;
if (storageBuffer != null ? !storageBuffer.equals(collector.storageBuffer) : collector.storageBuffer != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = storageBuffer != null ? storageBuffer.hashCode() : 0;
result = 31 * result + initPosition;
return result;
}
@Override
public String toString()
{
return "HyperLogLogCollector{" +
"initPosition=" + initPosition +
", version=" + getVersion() +
", registerOffset=" + getRegisterOffset() +
", numNonZeroRegisters=" + getNumNonZeroRegisters() +
", maxOverflowValue=" + getMaxOverflowValue() +
", maxOverflowRegister=" + getMaxOverflowRegister() +
'}';
}
private short decrementBuckets()
{
short count = 0;
int startPosition = getPayloadBytePosition();
for (int i = startPosition; i < startPosition + NUM_BYTES_FOR_BUCKETS; i++) {
byte val = (byte) (storageBuffer.get(i) - 0x11);
if ((val & 0xf0) != 0) {
count++;
}
if ((val & 0x0f) != 0) {
count++;
}
storageBuffer.put(i, val);
}
return count;
}
private void convertToMutableByteBuffer()
{
ByteBuffer tmpBuffer = ByteBuffer.allocate(storageBuffer.remaining());
tmpBuffer.put(storageBuffer.asReadOnlyBuffer());
tmpBuffer.position(0);
storageBuffer = tmpBuffer;
initPosition = 0;
}
private void convertToDenseStorage()
{
ByteBuffer tmpBuffer = ByteBuffer.wrap(new byte[getNumBytesForDenseStorage()]);
// put header
setVersion(tmpBuffer);
setRegisterOffset(tmpBuffer, getRegisterOffset());
setNumNonZeroRegisters(tmpBuffer, getNumNonZeroRegisters());
setMaxOverflowValue(tmpBuffer, getMaxOverflowValue());
setMaxOverflowRegister(tmpBuffer, getMaxOverflowRegister());
storageBuffer.position(getPayloadBytePosition());
tmpBuffer.position(getPayloadBytePosition(tmpBuffer));
// put payload
while (storageBuffer.hasRemaining()) {
tmpBuffer.put(storageBuffer.getShort(), storageBuffer.get());
}
tmpBuffer.rewind();
storageBuffer = tmpBuffer;
initPosition = 0;
}
private short addNibbleRegister(short bucket, byte positionOf1)
{
short numNonZeroRegs = getNumNonZeroRegisters();
final short position = (short) (bucket >> 1);
final boolean isUpperNibble = ((bucket & 0x1) == 0);
byte shiftedPositionOf1 = (isUpperNibble) ? (byte) (positionOf1 << bitsPerBucket) : positionOf1;
if (storageBuffer.remaining() != getNumBytesForDenseStorage()) {
convertToDenseStorage();
}
byte origVal = storageBuffer.get(getPayloadBytePosition() + position);
byte newValueMask = (isUpperNibble) ? (byte) 0xf0 : (byte) 0x0f;
byte originalValueMask = (byte) (newValueMask ^ 0xff);
// if something was at zero, we have to increase the numNonZeroRegisters
if ((origVal & newValueMask) == 0 && shiftedPositionOf1 != 0) {
numNonZeroRegs++;
}
storageBuffer.put(
getPayloadBytePosition() + position,
(byte) (UnsignedBytes.max((byte) (origVal & newValueMask), shiftedPositionOf1) | (origVal & originalValueMask))
);
return numNonZeroRegs;
}
/**
* Returns the number of registers that are no longer zero after the value was added
*
* @param position The position into the byte buffer, this position represents two "registers"
* @param offsetDiff The difference in offset between the byteToAdd and the current HyperLogLogCollector
* @param byteToAdd The byte to merge into the current HyperLogLogCollector
*
* @return
*/
private int mergeAndStoreByteRegister(
int position,
int offsetDiff,
byte byteToAdd
)
{
if (byteToAdd == 0) {
return 0;
}
byte currVal = storageBuffer.get(position);
int upperNibble = currVal & 0xf0;
int lowerNibble = currVal & 0x0f;
// subtract the differences so that the nibbles align
int otherUpper = (byteToAdd & 0xf0) - (offsetDiff << bitsPerBucket);
int otherLower = (byteToAdd & 0x0f) - offsetDiff;
final int newUpper = Math.max(upperNibble, otherUpper);
final int newLower = Math.max(lowerNibble, otherLower);
int numNoLongerZero = 0;
if (upperNibble == 0 && newUpper > 0) {
++numNoLongerZero;
}
if (lowerNibble == 0 && newLower > 0) {
++numNoLongerZero;
}
storageBuffer.put(position, (byte) ((newUpper | newLower) & 0xff));
return numNoLongerZero;
}
@Override
public int compareTo(HyperLogLogCollector other)
{
final int lhsOffset = (int) this.getRegisterOffset() & 0xffff;
final int rhsOffset = (int) other.getRegisterOffset() & 0xffff;
if (lhsOffset == rhsOffset) {
final int lhsNumNonZero = (int) this.getNumNonZeroRegisters() & 0xff;
final int rhsNumNonZero = (int) this.getNumNonZeroRegisters() & 0xff;
int retVal = Double.compare(lhsNumNonZero, rhsNumNonZero);
if (retVal == 0) {
retVal = Double.compare(this.estimateCardinality(), other.estimateCardinality());
}
return retVal;
} else {
return Double.compare(lhsOffset, rhsOffset);
}
}
}

View File

@ -0,0 +1,69 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation.hyperloglog;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.PostAggregator;
import java.util.Comparator;
import java.util.Map;
import java.util.Set;
/**
*/
public class HyperUniqueFinalizingPostAggregator implements PostAggregator
{
private final String fieldName;
@JsonCreator
public HyperUniqueFinalizingPostAggregator(
@JsonProperty("fieldName") String fieldName
)
{
this.fieldName = fieldName;
}
@Override
public Set<String> getDependentFields()
{
return Sets.newHashSet(fieldName);
}
@Override
public Comparator getComparator()
{
throw new UnsupportedOperationException();
}
@Override
public Object compute(Map<String, Object> combinedAggregators)
{
return HyperUniquesAggregatorFactory.estimateCardinality(combinedAggregators.get(fieldName));
}
@Override
@JsonProperty("fieldName")
public String getName()
{
return fieldName;
}
}

View File

@ -0,0 +1,86 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation.hyperloglog;
import io.druid.query.aggregation.Aggregator;
import io.druid.segment.ObjectColumnSelector;
/**
*/
public class HyperUniquesAggregator implements Aggregator
{
private final String name;
private final ObjectColumnSelector selector;
private HyperLogLogCollector collector;
public HyperUniquesAggregator(
String name,
ObjectColumnSelector selector
)
{
this.name = name;
this.selector = selector;
this.collector = HyperLogLogCollector.makeLatestCollector();
}
@Override
public void aggregate()
{
collector.fold((HyperLogLogCollector) selector.get());
}
@Override
public void reset()
{
collector = HyperLogLogCollector.makeLatestCollector();
}
@Override
public Object get()
{
return collector;
}
@Override
public float getFloat()
{
throw new UnsupportedOperationException();
}
@Override
public String getName()
{
return name;
}
@Override
public Aggregator clone()
{
return new HyperUniquesAggregator(name, selector);
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -0,0 +1,232 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation.hyperloglog;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Charsets;
import com.metamx.common.IAE;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.NoopAggregator;
import io.druid.query.aggregation.NoopBufferAggregator;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.ObjectColumnSelector;
import org.apache.commons.codec.binary.Base64;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
/**
*/
public class HyperUniquesAggregatorFactory implements AggregatorFactory
{
public static Object estimateCardinality(Object object)
{
if (object == null) {
return 0;
}
return ((HyperLogLogCollector) object).estimateCardinality();
}
private static final byte CACHE_TYPE_ID = 0x5;
private final String name;
private final String fieldName;
@JsonCreator
public HyperUniquesAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") String fieldName
)
{
this.name = name;
this.fieldName = fieldName.toLowerCase();
}
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName);
if (selector == null) {
return new NoopAggregator(name);
}
if (HyperLogLogCollector.class.isAssignableFrom(selector.classOfObject())) {
return new HyperUniquesAggregator(name, selector);
}
throw new IAE(
"Incompatible type for metric[%s], expected a HyperUnique, got a %s", fieldName, selector.classOfObject()
);
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName);
if (selector == null) {
return new NoopBufferAggregator();
}
if (HyperLogLogCollector.class.isAssignableFrom(selector.classOfObject())) {
return new HyperUniquesBufferAggregator(selector);
}
throw new IAE(
"Incompatible type for metric[%s], expected a HyperUnique, got a %s", fieldName, selector.classOfObject()
);
}
@Override
public Comparator getComparator()
{
return new Comparator<HyperLogLogCollector>()
{
@Override
public int compare(HyperLogLogCollector lhs, HyperLogLogCollector rhs)
{
return lhs.compareTo(rhs);
}
};
}
@Override
public Object combine(Object lhs, Object rhs)
{
if (rhs == null) {
return lhs;
}
if (lhs == null) {
return rhs;
}
return ((HyperLogLogCollector) lhs).fold((HyperLogLogCollector) rhs);
}
@Override
public AggregatorFactory getCombiningFactory()
{
return new HyperUniquesAggregatorFactory(name, name);
}
@Override
public Object deserialize(Object object)
{
if (object instanceof byte[]) {
return HyperLogLogCollector.makeCollector(ByteBuffer.wrap((byte[]) object));
} else if (object instanceof ByteBuffer) {
return HyperLogLogCollector.makeCollector((ByteBuffer) object);
} else if (object instanceof String) {
return HyperLogLogCollector.makeCollector(
ByteBuffer.wrap(Base64.decodeBase64(((String) object).getBytes(Charsets.UTF_8)))
);
}
return object;
}
@Override
public Object finalizeComputation(Object object)
{
return estimateCardinality(object);
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@Override
public List<String> requiredFields()
{
return Arrays.asList(fieldName);
}
@JsonProperty
public String getFieldName()
{
return fieldName;
}
@Override
public byte[] getCacheKey()
{
byte[] fieldNameBytes = fieldName.getBytes(Charsets.UTF_8);
return ByteBuffer.allocate(1 + fieldNameBytes.length).put(CACHE_TYPE_ID).put(fieldNameBytes).array();
}
@Override
public String getTypeName()
{
return "hyperUnique";
}
@Override
public int getMaxIntermediateSize()
{
return HyperLogLogCollector.getLatestNumBytesForDenseStorage();
}
@Override
public Object getAggregatorStartValue()
{
return HyperLogLogCollector.makeLatestCollector();
}
@Override
public String toString()
{
return "HyperUniquesAggregatorFactory{" +
"name='" + name + '\'' +
", fieldName='" + fieldName + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
HyperUniquesAggregatorFactory that = (HyperUniquesAggregatorFactory) o;
if (!fieldName.equals(that.fieldName)) return false;
if (!name.equals(that.name)) return false;
return true;
}
@Override
public int hashCode()
{
int result = name.hashCode();
result = 31 * result + fieldName.hashCode();
return result;
}
}

View File

@ -0,0 +1,87 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation.hyperloglog;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.ObjectColumnSelector;
import java.nio.ByteBuffer;
/**
*/
public class HyperUniquesBufferAggregator implements BufferAggregator
{
private static final byte[] EMPTY_BYTES = HyperLogLogCollector.makeEmptyVersionedByteArray();
private final ObjectColumnSelector selector;
public HyperUniquesBufferAggregator(
ObjectColumnSelector selector
)
{
this.selector = selector;
}
@Override
public void init(ByteBuffer buf, int position)
{
final ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
mutationBuffer.put(EMPTY_BYTES);
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
HyperLogLogCollector collector = (HyperLogLogCollector) selector.get();
if (collector == null) {
return;
}
HyperLogLogCollector.makeCollector(
(ByteBuffer) buf.duplicate().position(position).limit(
position
+ HyperLogLogCollector.getLatestNumBytesForDenseStorage()
)
).fold(collector);
}
@Override
public Object get(ByteBuffer buf, int position)
{
ByteBuffer dataCopyBuffer = ByteBuffer.allocate(HyperLogLogCollector.getLatestNumBytesForDenseStorage());
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
mutationBuffer.get(dataCopyBuffer.array());
return HyperLogLogCollector.makeCollector(dataCopyBuffer);
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException();
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -0,0 +1,148 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation.hyperloglog;
import com.google.common.base.Charsets;
import com.google.common.collect.Ordering;
import com.google.common.hash.HashFunction;
import io.druid.data.input.InputRow;
import io.druid.segment.column.ColumnBuilder;
import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.ObjectStrategy;
import io.druid.segment.serde.ColumnPartSerde;
import io.druid.segment.serde.ComplexColumnPartSerde;
import io.druid.segment.serde.ComplexColumnPartSupplier;
import io.druid.segment.serde.ComplexMetricExtractor;
import io.druid.segment.serde.ComplexMetricSerde;
import java.nio.ByteBuffer;
import java.util.List;
/**
*/
public class HyperUniquesSerde extends ComplexMetricSerde
{
private static Ordering<HyperLogLogCollector> comparator = new Ordering<HyperLogLogCollector>()
{
@Override
public int compare(
HyperLogLogCollector arg1, HyperLogLogCollector arg2
)
{
return arg1.toByteBuffer().compareTo(arg2.toByteBuffer());
}
}.nullsFirst();
private final HashFunction hashFn;
public HyperUniquesSerde(
HashFunction hashFn
)
{
this.hashFn = hashFn;
}
@Override
public String getTypeName()
{
return "hyperUnique";
}
@Override
public ComplexMetricExtractor getExtractor()
{
return new ComplexMetricExtractor()
{
@Override
public Class<HyperLogLogCollector> extractedClass()
{
return HyperLogLogCollector.class;
}
@Override
public HyperLogLogCollector extractValue(InputRow inputRow, String metricName)
{
HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
List<String> dimValues = inputRow.getDimension(metricName);
if (dimValues == null) {
return collector;
}
for (String dimensionValue : dimValues) {
collector.add(hashFn.hashBytes(dimensionValue.getBytes(Charsets.UTF_8)).asBytes());
}
return collector;
}
};
}
@Override
public ColumnPartSerde deserializeColumn(
ByteBuffer byteBuffer, ColumnBuilder columnBuilder
)
{
final GenericIndexed column = GenericIndexed.read(byteBuffer, getObjectStrategy());
columnBuilder.setComplexColumn(new ComplexColumnPartSupplier(getTypeName(), column));
return new ComplexColumnPartSerde(column, getTypeName());
}
@Override
public ObjectStrategy getObjectStrategy()
{
return new ObjectStrategy<HyperLogLogCollector>()
{
@Override
public Class<? extends HyperLogLogCollector> getClazz()
{
return HyperLogLogCollector.class;
}
@Override
public HyperLogLogCollector fromByteBuffer(ByteBuffer buffer, int numBytes)
{
buffer.limit(buffer.position() + numBytes);
int remaining = buffer.remaining();
return (remaining % 3 == 0 || remaining == 1027) ? new HLLCV0(buffer) : new HLLCV1(buffer);
}
@Override
public byte[] toBytes(HyperLogLogCollector collector)
{
if (collector == null) {
return new byte[]{};
}
ByteBuffer val = collector.toByteBuffer();
byte[] retVal = new byte[val.remaining()];
val.asReadOnlyBuffer().get(retVal);
return retVal;
}
@Override
public int compare(HyperLogLogCollector o1, HyperLogLogCollector o2)
{
return comparator.compare(o1, o2);
}
};
}
}

View File

@ -193,4 +193,30 @@ public class ArithmeticPostAggregator implements PostAggregator
return lookupMap.keySet();
}
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ArithmeticPostAggregator that = (ArithmeticPostAggregator) o;
if (fields != null ? !fields.equals(that.fields) : that.fields != null) return false;
if (fnName != null ? !fnName.equals(that.fnName) : that.fnName != null) return false;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
if (op != that.op) return false;
return true;
}
@Override
public int hashCode()
{
int result = name != null ? name.hashCode() : 0;
result = 31 * result + (fnName != null ? fnName.hashCode() : 0);
result = 31 * result + (fields != null ? fields.hashCode() : 0);
result = 31 * result + (op != null ? op.hashCode() : 0);
return result;
}
}

View File

@ -77,7 +77,7 @@ public class ConstantPostAggregator implements PostAggregator
return name;
}
@JsonProperty
@JsonProperty("value")
public Number getConstantValue()
{
return constantValue;
@ -91,4 +91,39 @@ public class ConstantPostAggregator implements PostAggregator
", constantValue=" + constantValue +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ConstantPostAggregator that = (ConstantPostAggregator) o;
if (constantValue != null && that.constantValue != null) {
if (constantValue.doubleValue() != that.constantValue.doubleValue()) {
return false;
}
} else if (constantValue != that.constantValue) {
return false;
}
if (name != null ? !name.equals(that.name) : that.name != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = name != null ? name.hashCode() : 0;
result = 31 * result + (constantValue != null ? constantValue.hashCode() : 0);
return result;
}
}

View File

@ -84,4 +84,26 @@ public class FieldAccessPostAggregator implements PostAggregator
", fieldName='" + fieldName + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FieldAccessPostAggregator that = (FieldAccessPostAggregator) o;
if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = name != null ? name.hashCode() : 0;
result = 31 * result + (fieldName != null ? fieldName.hashCode() : 0);
return result;
}
}

View File

@ -142,4 +142,30 @@ public class JavaScriptPostAggregator implements PostAggregator
{
return function;
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
JavaScriptPostAggregator that = (JavaScriptPostAggregator) o;
if (fieldNames != null ? !fieldNames.equals(that.fieldNames) : that.fieldNames != null) return false;
if (fn != null ? !fn.equals(that.fn) : that.fn != null) return false;
if (function != null ? !function.equals(that.function) : that.function != null) return false;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = name != null ? name.hashCode() : 0;
result = 31 * result + (fieldNames != null ? fieldNames.hashCode() : 0);
result = 31 * result + (function != null ? function.hashCode() : 0);
result = 31 * result + (fn != null ? fn.hashCode() : 0);
return result;
}
}

View File

@ -84,4 +84,26 @@ public class DefaultDimensionSpec implements DimensionSpec
", outputName='" + outputName + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DefaultDimensionSpec that = (DefaultDimensionSpec) o;
if (dimension != null ? !dimension.equals(that.dimension) : that.dimension != null) return false;
if (outputName != null ? !outputName.equals(that.outputName) : that.outputName != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = dimension != null ? dimension.hashCode() : 0;
result = 31 * result + (outputName != null ? outputName.hashCode() : 0);
return result;
}
}

View File

@ -92,4 +92,29 @@ public class ExtractionDimensionSpec implements DimensionSpec
", outputName='" + outputName + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ExtractionDimensionSpec that = (ExtractionDimensionSpec) o;
if (dimExtractionFn != null ? !dimExtractionFn.equals(that.dimExtractionFn) : that.dimExtractionFn != null)
return false;
if (dimension != null ? !dimension.equals(that.dimension) : that.dimension != null) return false;
if (outputName != null ? !outputName.equals(that.outputName) : that.outputName != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = dimension != null ? dimension.hashCode() : 0;
result = 31 * result + (dimExtractionFn != null ? dimExtractionFn.hashCode() : 0);
result = 31 * result + (outputName != null ? outputName.hashCode() : 0);
return result;
}
}

View File

@ -33,7 +33,11 @@ import com.metamx.common.guava.Sequences;
import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularity;
import io.druid.query.BaseQuery;
import io.druid.query.DataSource;
import io.druid.query.Queries;
import io.druid.query.Query;
import io.druid.query.QueryDataSource;
import io.druid.query.TableDataSource;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DefaultDimensionSpec;
@ -72,7 +76,7 @@ public class GroupByQuery extends BaseQuery<Row>
@JsonCreator
public GroupByQuery(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("dataSource") DataSource dataSource,
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
@JsonProperty("filter") DimFilter dimFilter,
@JsonProperty("granularity") QueryGranularity granularity,
@ -133,7 +137,7 @@ public class GroupByQuery extends BaseQuery<Row>
* have already passed in order for the object to exist.
*/
private GroupByQuery(
String dataSource,
DataSource dataSource,
QuerySegmentSpec querySegmentSpec,
DimFilter dimFilter,
QueryGranularity granularity,
@ -255,7 +259,7 @@ public class GroupByQuery extends BaseQuery<Row>
public static class Builder
{
private String dataSource;
private DataSource dataSource;
private QuerySegmentSpec querySegmentSpec;
private DimFilter dimFilter;
private QueryGranularity granularity;
@ -270,7 +274,9 @@ public class GroupByQuery extends BaseQuery<Row>
private List<OrderByColumnSpec> orderByColumnSpecs = Lists.newArrayList();
private int limit = Integer.MAX_VALUE;
private Builder() {}
private Builder()
{
}
private Builder(Builder builder)
{
@ -288,12 +294,24 @@ public class GroupByQuery extends BaseQuery<Row>
context = builder.context;
}
public Builder setDataSource(String dataSource)
public Builder setDataSource(DataSource dataSource)
{
this.dataSource = dataSource;
return this;
}
public Builder setDataSource(String dataSource)
{
this.dataSource = new TableDataSource(dataSource);
return this;
}
public Builder setDataSource(Query query)
{
this.dataSource = new QueryDataSource(query);
return this;
}
public Builder setInterval(Object interval)
{
return setQuerySegmentSpec(new LegacySegmentSpec(interval));
@ -479,13 +497,52 @@ public class GroupByQuery extends BaseQuery<Row>
public String toString()
{
return "GroupByQuery{" +
"limitSpec=" + limitSpec +
", dimFilter=" + dimFilter +
", granularity=" + granularity +
", dimensions=" + dimensions +
", aggregatorSpecs=" + aggregatorSpecs +
", postAggregatorSpecs=" + postAggregatorSpecs +
", orderByLimitFn=" + orderByLimitFn +
'}';
"limitSpec=" + limitSpec +
", dimFilter=" + dimFilter +
", granularity=" + granularity +
", dimensions=" + dimensions +
", aggregatorSpecs=" + aggregatorSpecs +
", postAggregatorSpecs=" + postAggregatorSpecs +
", orderByLimitFn=" + orderByLimitFn +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
GroupByQuery that = (GroupByQuery) o;
if (aggregatorSpecs != null ? !aggregatorSpecs.equals(that.aggregatorSpecs) : that.aggregatorSpecs != null)
return false;
if (dimFilter != null ? !dimFilter.equals(that.dimFilter) : that.dimFilter != null) return false;
if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) return false;
if (granularity != null ? !granularity.equals(that.granularity) : that.granularity != null) return false;
if (havingSpec != null ? !havingSpec.equals(that.havingSpec) : that.havingSpec != null) return false;
if (limitSpec != null ? !limitSpec.equals(that.limitSpec) : that.limitSpec != null) return false;
if (orderByLimitFn != null ? !orderByLimitFn.equals(that.orderByLimitFn) : that.orderByLimitFn != null)
return false;
if (postAggregatorSpecs != null ? !postAggregatorSpecs.equals(that.postAggregatorSpecs) : that.postAggregatorSpecs != null)
return false;
return true;
}
@Override
public int hashCode()
{
int result = super.hashCode();
result = 31 * result + (limitSpec != null ? limitSpec.hashCode() : 0);
result = 31 * result + (havingSpec != null ? havingSpec.hashCode() : 0);
result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0);
result = 31 * result + (granularity != null ? granularity.hashCode() : 0);
result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0);
result = 31 * result + (aggregatorSpecs != null ? aggregatorSpecs.hashCode() : 0);
result = 31 * result + (postAggregatorSpecs != null ? postAggregatorSpecs.hashCode() : 0);
result = 31 * result + (orderByLimitFn != null ? orderByLimitFn.hashCode() : 0);
return result;
}
}

View File

@ -24,9 +24,6 @@ import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity;

View File

@ -34,13 +34,17 @@ import com.metamx.common.guava.Sequences;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.query.DataSource;
import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryDataSource;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.SubqueryQueryRunner;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.joda.time.Interval;
import org.joda.time.Minutes;
@ -56,13 +60,16 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
private static final String GROUP_BY_MERGE_KEY = "groupByMerge";
private static final Map<String, String> NO_MERGE_CONTEXT = ImmutableMap.of(GROUP_BY_MERGE_KEY, "false");
private final Supplier<GroupByQueryConfig> configSupplier;
private GroupByQueryEngine engine; // For running the outer query around a subquery
@Inject
public GroupByQueryQueryToolChest(
Supplier<GroupByQueryConfig> configSupplier
Supplier<GroupByQueryConfig> configSupplier,
GroupByQueryEngine engine
)
{
this.configSupplier = configSupplier;
this.engine = engine;
}
@Override
@ -84,13 +91,32 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
private Sequence<Row> mergeGroupByResults(final GroupByQuery query, QueryRunner<Row> runner)
{
final GroupByQueryConfig config = configSupplier.get();
Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
query,
config
);
IncrementalIndex index = runner.run(query).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
Sequence<Row> result;
// If there's a subquery, merge subquery results and then apply the aggregator
DataSource dataSource = query.getDataSource();
if (dataSource instanceof QueryDataSource) {
GroupByQuery subquery;
try {
subquery = (GroupByQuery) ((QueryDataSource) dataSource).getQuery();
} catch (ClassCastException e) {
throw new UnsupportedOperationException("Subqueries must be of type 'group by'");
}
Sequence<Row> subqueryResult = mergeGroupByResults(subquery, runner);
IncrementalIndexStorageAdapter adapter
= new IncrementalIndexStorageAdapter(makeIncrementalIndex(subquery, subqueryResult));
result = engine.process(query, adapter);
} else {
result = runner.run(query);
}
return postAggregate(query, makeIncrementalIndex(query, result));
}
private Sequence<Row> postAggregate(final GroupByQuery query, IncrementalIndex index)
{
Sequence<Row> sequence = Sequences.map(
Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())),
new Function<Row, Row>()
@ -101,7 +127,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
final MapBasedRow row = (MapBasedRow) input;
return new MapBasedRow(
query.getGranularity()
.toDateTime(row.getTimestampFromEpoch()),
.toDateTime(row.getTimestampFromEpoch()),
row.getEvent()
);
}
@ -110,6 +136,18 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
return query.applyLimit(sequence);
}
private IncrementalIndex makeIncrementalIndex(GroupByQuery query, Sequence<Row> rows)
{
final GroupByQueryConfig config = configSupplier.get();
Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
query,
config
);
return rows.accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
}
@Override
public Sequence<Row> mergeSequences(Sequence<Sequence<Row>> seqOfSequences)
{
@ -125,7 +163,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
}
return new ServiceMetricEvent.Builder()
.setUser2(query.getDataSource())
.setUser2(query.getDataSource().toString())
.setUser3(String.format("%,d dims", query.getDimensions().size()))
.setUser4("groupBy")
.setUser5(Joiner.on(",").join(query.getIntervals()))
@ -165,6 +203,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
@Override
public QueryRunner<Row> preMergeQueryDecoration(QueryRunner<Row> runner)
{
return new IntervalChunkingQueryRunner<Row>(runner, configSupplier.get().getChunkPeriod());
return new SubqueryQueryRunner<Row>(
new IntervalChunkingQueryRunner<Row>(runner, configSupplier.get().getChunkPeriod()));
}
}

View File

@ -196,6 +196,25 @@ public class DefaultLimitSpec implements LimitSpec
{
return Sequences.limit(input, limit);
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
LimitingFn that = (LimitingFn) o;
if (limit != that.limit) return false;
return true;
}
@Override
public int hashCode()
{
return limit;
}
}
private static class SortingFn implements Function<Sequence<Row>, Sequence<Row>>
@ -209,6 +228,25 @@ public class DefaultLimitSpec implements LimitSpec
{
return Sequences.sort(input, ordering);
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SortingFn sortingFn = (SortingFn) o;
if (ordering != null ? !ordering.equals(sortingFn.ordering) : sortingFn.ordering != null) return false;
return true;
}
@Override
public int hashCode()
{
return ordering != null ? ordering.hashCode() : 0;
}
}
private static class TopNFunction implements Function<Sequence<Row>, Sequence<Row>>
@ -231,5 +269,49 @@ public class DefaultLimitSpec implements LimitSpec
final ArrayList<Row> materializedList = Sequences.toList(input, Lists.<Row>newArrayList());
return Sequences.simple(sorter.toTopN(materializedList, limit));
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TopNFunction that = (TopNFunction) o;
if (limit != that.limit) return false;
if (sorter != null ? !sorter.equals(that.sorter) : that.sorter != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = sorter != null ? sorter.hashCode() : 0;
result = 31 * result + limit;
return result;
}
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DefaultLimitSpec that = (DefaultLimitSpec) o;
if (limit != that.limit) return false;
if (columns != null ? !columns.equals(that.columns) : that.columns != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = columns != null ? columns.hashCode() : 0;
result = 31 * result + limit;
return result;
}
}

View File

@ -46,4 +46,15 @@ public class NoopLimitSpec implements LimitSpec
{
return "NoopLimitSpec";
}
@Override
public boolean equals(Object other)
{
return (other instanceof NoopLimitSpec);
}
@Override
public int hashCode() {
return 0;
}
}

View File

@ -147,7 +147,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
}
return new ServiceMetricEvent.Builder()
.setUser2(query.getDataSource())
.setUser2(query.getDataSource().toString())
.setUser4(query.getType())
.setUser5(Joiner.on(",").join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
@ -179,9 +179,9 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
{
byte[] includerBytes = query.getToInclude().getCacheKey();
return ByteBuffer.allocate(1 + includerBytes.length)
.put(SEGMENT_METADATA_CACHE_PREFIX)
.put(includerBytes)
.array();
.put(SEGMENT_METADATA_CACHE_PREFIX)
.put(includerBytes)
.array();
}
@Override

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.query.BaseQuery;
import io.druid.query.Query;
import io.druid.query.TableDataSource;
import io.druid.query.spec.QuerySegmentSpec;
import java.util.Map;
@ -42,7 +43,7 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
@JsonProperty("context") Map<String, String> context
)
{
super(dataSource, querySegmentSpec, context);
super(new TableDataSource(dataSource), querySegmentSpec, context);
this.toInclude = toInclude == null ? new AllColumnIncluderator() : toInclude;
this.merge = merge == null ? false : merge;
@ -76,13 +77,40 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
public Query<SegmentAnalysis> withOverriddenContext(Map<String, String> contextOverride)
{
return new SegmentMetadataQuery(
getDataSource(), getQuerySegmentSpec(), toInclude, merge, computeOverridenContext(contextOverride)
((TableDataSource)getDataSource()).getName(),
getQuerySegmentSpec(), toInclude, merge, computeOverridenContext(contextOverride)
);
}
@Override
public Query<SegmentAnalysis> withQuerySegmentSpec(QuerySegmentSpec spec)
{
return new SegmentMetadataQuery(getDataSource(), spec, toInclude, merge, getContext());
return new SegmentMetadataQuery(
((TableDataSource)getDataSource()).getName(),
spec, toInclude, merge, getContext());
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
SegmentMetadataQuery that = (SegmentMetadataQuery) o;
if (merge != that.merge) return false;
if (toInclude != null ? !toInclude.equals(that.toInclude) : that.toInclude != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = super.hashCode();
result = 31 * result + (toInclude != null ? toInclude.hashCode() : 0);
result = 31 * result + (merge ? 1 : 0);
return result;
}
}

View File

@ -121,7 +121,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
}
return new ServiceMetricEvent.Builder()
.setUser2(query.getDataSource())
.setUser2(query.getDataSource().toString())
.setUser4("search")
.setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
@ -173,7 +173,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
final ByteBuffer queryCacheKey = ByteBuffer
.allocate(
1 + 4 + granularityBytes.length + filterBytes.length +
querySpecBytes.length + dimensionsBytesSize
querySpecBytes.length + dimensionsBytesSize
)
.put(SEARCH_QUERY)
.put(Ints.toByteArray(query.getLimit()))

View File

@ -99,4 +99,23 @@ public class FragmentSearchQuerySpec implements SearchQuerySpec
"values=" + values +
"}";
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FragmentSearchQuerySpec that = (FragmentSearchQuerySpec) o;
if (values != null ? !values.equals(that.values) : that.values != null) return false;
return true;
}
@Override
public int hashCode()
{
return values != null ? values.hashCode() : 0;
}
}

View File

@ -73,4 +73,23 @@ public class InsensitiveContainsSearchQuerySpec implements SearchQuerySpec
"value=" + value +
"}";
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
InsensitiveContainsSearchQuerySpec that = (InsensitiveContainsSearchQuerySpec) o;
if (value != null ? !value.equals(that.value) : that.value != null) return false;
return true;
}
@Override
public int hashCode()
{
return value != null ? value.hashCode() : 0;
}
}

View File

@ -50,4 +50,9 @@ public class LexicographicSearchSortSpec implements SearchSortSpec
{
return "lexicographicSort";
}
@Override
public boolean equals(Object other) {
return (other instanceof LexicographicSearchSortSpec);
}
}

View File

@ -26,6 +26,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import io.druid.granularity.QueryGranularity;
import io.druid.query.BaseQuery;
import io.druid.query.DataSource;
import io.druid.query.Query;
import io.druid.query.Result;
import io.druid.query.filter.DimFilter;
@ -49,7 +50,7 @@ public class SearchQuery extends BaseQuery<Result<SearchResultValue>>
@JsonCreator
public SearchQuery(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("dataSource") DataSource dataSource,
@JsonProperty("filter") DimFilter dimFilter,
@JsonProperty("granularity") QueryGranularity granularity,
@JsonProperty("limit") int limit,
@ -181,13 +182,45 @@ public class SearchQuery extends BaseQuery<Result<SearchResultValue>>
public String toString()
{
return "SearchQuery{" +
"dataSource='" + getDataSource() + '\'' +
", dimFilter=" + dimFilter +
", granularity='" + granularity + '\'' +
", dimensions=" + dimensions +
", querySpec=" + querySpec +
", querySegmentSpec=" + getQuerySegmentSpec() +
", limit=" + limit +
'}';
"dataSource='" + getDataSource() + '\'' +
", dimFilter=" + dimFilter +
", granularity='" + granularity + '\'' +
", dimensions=" + dimensions +
", querySpec=" + querySpec +
", querySegmentSpec=" + getQuerySegmentSpec() +
", limit=" + limit +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
SearchQuery that = (SearchQuery) o;
if (limit != that.limit) return false;
if (dimFilter != null ? !dimFilter.equals(that.dimFilter) : that.dimFilter != null) return false;
if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) return false;
if (granularity != null ? !granularity.equals(that.granularity) : that.granularity != null) return false;
if (querySpec != null ? !querySpec.equals(that.querySpec) : that.querySpec != null) return false;
if (sortSpec != null ? !sortSpec.equals(that.sortSpec) : that.sortSpec != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = super.hashCode();
result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0);
result = 31 * result + (sortSpec != null ? sortSpec.hashCode() : 0);
result = 31 * result + (granularity != null ? granularity.hashCode() : 0);
result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0);
result = 31 * result + (querySpec != null ? querySpec.hashCode() : 0);
result = 31 * result + limit;
return result;
}
}

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.druid.granularity.QueryGranularity;
import io.druid.query.BaseQuery;
import io.druid.query.DataSource;
import io.druid.query.Query;
import io.druid.query.Result;
import io.druid.query.filter.DimFilter;
@ -45,7 +46,7 @@ public class SelectQuery extends BaseQuery<Result<SelectResultValue>>
@JsonCreator
public SelectQuery(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("dataSource") DataSource dataSource,
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
@JsonProperty("filter") DimFilter dimFilter,
@JsonProperty("granularity") QueryGranularity granularity,
@ -146,4 +147,34 @@ public class SelectQuery extends BaseQuery<Result<SelectResultValue>>
", pagingSpec=" + pagingSpec +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
SelectQuery that = (SelectQuery) o;
if (dimFilter != null ? !dimFilter.equals(that.dimFilter) : that.dimFilter != null) return false;
if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) return false;
if (granularity != null ? !granularity.equals(that.granularity) : that.granularity != null) return false;
if (metrics != null ? !metrics.equals(that.metrics) : that.metrics != null) return false;
if (pagingSpec != null ? !pagingSpec.equals(that.pagingSpec) : that.pagingSpec != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = super.hashCode();
result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0);
result = 31 * result + (granularity != null ? granularity.hashCode() : 0);
result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0);
result = 31 * result + (metrics != null ? metrics.hashCode() : 0);
result = 31 * result + (pagingSpec != null ? pagingSpec.hashCode() : 0);
return result;
}
}

View File

@ -123,7 +123,7 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
}
return new ServiceMetricEvent.Builder()
.setUser2(query.getDataSource())
.setUser2(query.getDataSource().toString())
.setUser4("Select")
.setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))

View File

@ -64,4 +64,23 @@ public class MultipleIntervalSegmentSpec implements QuerySegmentSpec
"intervals=" + intervals +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MultipleIntervalSegmentSpec that = (MultipleIntervalSegmentSpec) o;
if (intervals != null ? !intervals.equals(that.intervals) : that.intervals != null) return false;
return true;
}
@Override
public int hashCode()
{
return intervals != null ? intervals.hashCode() : 0;
}
}

View File

@ -91,4 +91,26 @@ public class MultipleSpecificSegmentSpec implements QuerySegmentSpec
"descriptors=" + descriptors +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MultipleSpecificSegmentSpec that = (MultipleSpecificSegmentSpec) o;
if (descriptors != null ? !descriptors.equals(that.descriptors) : that.descriptors != null) return false;
if (intervals != null ? !intervals.equals(that.intervals) : that.intervals != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = descriptors != null ? descriptors.hashCode() : 0;
result = 31 * result + (intervals != null ? intervals.hashCode() : 0);
return result;
}
}

View File

@ -51,4 +51,23 @@ public class SpecificSegmentSpec implements QuerySegmentSpec
{
return walker.getQueryRunnerForSegments(query, Arrays.asList(descriptor));
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SpecificSegmentSpec that = (SpecificSegmentSpec) o;
if (descriptor != null ? !descriptor.equals(that.descriptor) : that.descriptor != null) return false;
return true;
}
@Override
public int hashCode()
{
return descriptor != null ? descriptor.hashCode() : 0;
}
}

View File

@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.druid.query.BaseQuery;
import io.druid.query.DataSource;
import io.druid.query.Query;
import io.druid.query.Result;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
@ -51,7 +52,7 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
@JsonCreator
public TimeBoundaryQuery(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("dataSource") DataSource dataSource,
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
@JsonProperty("context") Map<String, String> context
)

View File

@ -78,7 +78,7 @@ public class TimeBoundaryQueryQueryToolChest
public boolean apply(T input)
{
return input.getInterval().overlaps(first.getInterval()) || input.getInterval()
.overlaps(second.getInterval());
.overlaps(second.getInterval());
}
}
)
@ -117,7 +117,7 @@ public class TimeBoundaryQueryQueryToolChest
public ServiceMetricEvent.Builder makeMetricBuilder(TimeBoundaryQuery query)
{
return new ServiceMetricEvent.Builder()
.setUser2(query.getDataSource())
.setUser2(query.getDataSource().toString())
.setUser4(query.getType())
.setUser6("false")
.setUser10(query.getId());
@ -146,9 +146,9 @@ public class TimeBoundaryQueryQueryToolChest
public byte[] computeCacheKey(TimeBoundaryQuery query)
{
return ByteBuffer.allocate(2)
.put(TIMEBOUNDARY_QUERY)
.put(query.getCacheKey())
.array();
.put(TIMEBOUNDARY_QUERY)
.put(query.getCacheKey())
.array();
}
@Override

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.ImmutableList;
import io.druid.granularity.QueryGranularity;
import io.druid.query.BaseQuery;
import io.druid.query.DataSource;
import io.druid.query.Queries;
import io.druid.query.Query;
import io.druid.query.Result;
@ -48,7 +49,7 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
@JsonCreator
public TimeseriesQuery(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("dataSource") DataSource dataSource,
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
@JsonProperty("filter") DimFilter dimFilter,
@JsonProperty("granularity") QueryGranularity granularity,
@ -132,14 +133,43 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
public String toString()
{
return "TimeseriesQuery{" +
"dataSource='" + getDataSource() + '\'' +
", querySegmentSpec=" + getQuerySegmentSpec() +
", dimFilter=" + dimFilter +
", granularity='" + granularity + '\'' +
", aggregatorSpecs=" + aggregatorSpecs +
", postAggregatorSpecs=" + postAggregatorSpecs +
", context=" + getContext() +
'}';
"dataSource='" + getDataSource() + '\'' +
", querySegmentSpec=" + getQuerySegmentSpec() +
", dimFilter=" + dimFilter +
", granularity='" + granularity + '\'' +
", aggregatorSpecs=" + aggregatorSpecs +
", postAggregatorSpecs=" + postAggregatorSpecs +
", context=" + getContext() +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
TimeseriesQuery that = (TimeseriesQuery) o;
if (aggregatorSpecs != null ? !aggregatorSpecs.equals(that.aggregatorSpecs) : that.aggregatorSpecs != null)
return false;
if (dimFilter != null ? !dimFilter.equals(that.dimFilter) : that.dimFilter != null) return false;
if (granularity != null ? !granularity.equals(that.granularity) : that.granularity != null) return false;
if (postAggregatorSpecs != null ? !postAggregatorSpecs.equals(that.postAggregatorSpecs) : that.postAggregatorSpecs != null)
return false;
return true;
}
@Override
public int hashCode()
{
int result = super.hashCode();
result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0);
result = 31 * result + (granularity != null ? granularity.hashCode() : 0);
result = 31 * result + (aggregatorSpecs != null ? aggregatorSpecs.hashCode() : 0);
result = 31 * result + (postAggregatorSpecs != null ? postAggregatorSpecs.hashCode() : 0);
return result;
}
}

View File

@ -123,7 +123,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
}
return new ServiceMetricEvent.Builder()
.setUser2(query.getDataSource())
.setUser2(query.getDataSource().toString())
.setUser4("timeseries")
.setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))

View File

@ -79,7 +79,17 @@ public class AggregateTopNMetricFirstAlgorithm implements TopNAlgorithm<int[], T
)
{
final TopNResultBuilder singleMetricResultBuilder = makeResultBuilder(params);
final String metric = ((NumericTopNMetricSpec) query.getTopNMetricSpec()).getMetric();
final String metric;
// ugly
TopNMetricSpec spec = query.getTopNMetricSpec();
if (spec instanceof InvertedTopNMetricSpec
&& ((InvertedTopNMetricSpec) spec).getDelegate() instanceof NumericTopNMetricSpec) {
metric = ((NumericTopNMetricSpec) ((InvertedTopNMetricSpec) spec).getDelegate()).getMetric();
} else if (spec instanceof NumericTopNMetricSpec) {
metric = ((NumericTopNMetricSpec) query.getTopNMetricSpec()).getMetric();
} else {
throw new ISE("WTF?! We are in AggregateTopNMetricFirstAlgorithm with a [%s] spec", spec.getClass().getName());
}
// Find either the aggregator or post aggregator to do the topN over
List<AggregatorFactory> condensedAggs = Lists.newArrayList();

View File

@ -78,8 +78,6 @@ public abstract class BaseTopNAlgorithm<DimValSelector, DimValAggregateStore, Pa
while (numProcessed < cardinality) {
final int numToProcess = Math.min(params.getNumValuesPerPass(), cardinality - numProcessed);
params.getCursor().reset();
DimValSelector theDimValSelector;
if (!hasDimValSelector) {
theDimValSelector = makeDimValSelector(params, numProcessed, numToProcess);
@ -96,6 +94,7 @@ public abstract class BaseTopNAlgorithm<DimValSelector, DimValAggregateStore, Pa
closeAggregators(aggregatesStore);
numProcessed += numToProcess;
params.getCursor().reset();
}
}

View File

@ -101,4 +101,23 @@ public class InvertedTopNMetricSpec implements TopNMetricSpec
{
delegate.initTopNAlgorithmSelector(selector);
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
InvertedTopNMetricSpec that = (InvertedTopNMetricSpec) o;
if (delegate != null ? !delegate.equals(that.delegate) : that.delegate != null) return false;
return true;
}
@Override
public int hashCode()
{
return delegate != null ? delegate.hashCode() : 0;
}
}

View File

@ -118,4 +118,23 @@ public class LexicographicTopNMetricSpec implements TopNMetricSpec
"previousStop='" + previousStop + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
LexicographicTopNMetricSpec that = (LexicographicTopNMetricSpec) o;
if (previousStop != null ? !previousStop.equals(that.previousStop) : that.previousStop != null) return false;
return true;
}
@Override
public int hashCode()
{
return previousStop != null ? previousStop.hashCode() : 0;
}
}

View File

@ -157,4 +157,23 @@ public class NumericTopNMetricSpec implements TopNMetricSpec
"metric='" + metric + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
NumericTopNMetricSpec that = (NumericTopNMetricSpec) o;
if (metric != null ? !metric.equals(that.metric) : that.metric != null) return false;
return true;
}
@Override
public int hashCode()
{
return metric != null ? metric.hashCode() : 0;
}
}

View File

@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import io.druid.granularity.QueryGranularity;
import io.druid.query.BaseQuery;
import io.druid.query.DataSource;
import io.druid.query.Queries;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
@ -52,7 +53,7 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
@JsonCreator
public TopNQuery(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("dataSource") DataSource dataSource,
@JsonProperty("dimension") DimensionSpec dimensionSpec,
@JsonProperty("metric") TopNMetricSpec topNMetricSpec,
@JsonProperty("threshold") int threshold,
@ -208,4 +209,42 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
", postAggregatorSpecs=" + postAggregatorSpecs +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
TopNQuery topNQuery = (TopNQuery) o;
if (threshold != topNQuery.threshold) return false;
if (aggregatorSpecs != null ? !aggregatorSpecs.equals(topNQuery.aggregatorSpecs) : topNQuery.aggregatorSpecs != null)
return false;
if (dimFilter != null ? !dimFilter.equals(topNQuery.dimFilter) : topNQuery.dimFilter != null) return false;
if (dimensionSpec != null ? !dimensionSpec.equals(topNQuery.dimensionSpec) : topNQuery.dimensionSpec != null)
return false;
if (granularity != null ? !granularity.equals(topNQuery.granularity) : topNQuery.granularity != null) return false;
if (postAggregatorSpecs != null ? !postAggregatorSpecs.equals(topNQuery.postAggregatorSpecs) : topNQuery.postAggregatorSpecs != null)
return false;
if (topNMetricSpec != null ? !topNMetricSpec.equals(topNQuery.topNMetricSpec) : topNQuery.topNMetricSpec != null)
return false;
return true;
}
@Override
public int hashCode()
{
int result = super.hashCode();
result = 31 * result + (dimensionSpec != null ? dimensionSpec.hashCode() : 0);
result = 31 * result + (topNMetricSpec != null ? topNMetricSpec.hashCode() : 0);
result = 31 * result + threshold;
result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0);
result = 31 * result + (granularity != null ? granularity.hashCode() : 0);
result = 31 * result + (aggregatorSpecs != null ? aggregatorSpecs.hashCode() : 0);
result = 31 * result + (postAggregatorSpecs != null ? postAggregatorSpecs.hashCode() : 0);
return result;
}
}

View File

@ -21,6 +21,8 @@ package io.druid.query.topn;
import com.google.common.collect.Lists;
import io.druid.granularity.QueryGranularity;
import io.druid.query.DataSource;
import io.druid.query.TableDataSource;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DefaultDimensionSpec;
@ -58,7 +60,7 @@ import java.util.Map;
*/
public class TopNQueryBuilder
{
private String dataSource;
private DataSource dataSource;
private DimensionSpec dimensionSpec;
private TopNMetricSpec topNMetricSpec;
private int threshold;
@ -71,7 +73,7 @@ public class TopNQueryBuilder
public TopNQueryBuilder()
{
dataSource = "";
dataSource = null;
dimensionSpec = null;
topNMetricSpec = null;
threshold = 0;
@ -83,7 +85,7 @@ public class TopNQueryBuilder
context = null;
}
public String getDataSource()
public DataSource getDataSource()
{
return dataSource;
}
@ -152,7 +154,7 @@ public class TopNQueryBuilder
public TopNQueryBuilder copy(TopNQuery query)
{
return new TopNQueryBuilder()
.dataSource(query.getDataSource())
.dataSource(query.getDataSource().toString())
.dimension(query.getDimensionSpec())
.metric(query.getTopNMetricSpec())
.threshold(query.getThreshold())
@ -180,6 +182,12 @@ public class TopNQueryBuilder
}
public TopNQueryBuilder dataSource(String d)
{
dataSource = new TableDataSource(d);
return this;
}
public TopNQueryBuilder dataSource(DataSource d)
{
dataSource = d;
return this;

Some files were not shown because too many files have changed in this diff Show More