diff --git a/build.sh b/build.sh
index ec9ecad11bd..78e5fffe175 100755
--- a/build.sh
+++ b/build.sh
@@ -30,4 +30,4 @@ echo "For examples, see: "
echo " "
ls -1 examples/*/*sh
echo " "
-echo "See also http://druid.io/docs/0.6.51"
+echo "See also http://druid.io/docs/0.6.52"
diff --git a/cassandra-storage/pom.xml b/cassandra-storage/pom.xml
index 21953a6e466..0d33b40c711 100644
--- a/cassandra-storage/pom.xml
+++ b/cassandra-storage/pom.xml
@@ -28,7 +28,7 @@
io.druiddruid
- 0.6.52-SNAPSHOT
+ 0.6.53-SNAPSHOT
diff --git a/common/pom.xml b/common/pom.xml
index 0afd4f1c0df..b02af58de22 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -28,7 +28,7 @@
io.druiddruid
- 0.6.52-SNAPSHOT
+ 0.6.53-SNAPSHOT
diff --git a/common/src/main/java/io/druid/concurrent/Execs.java b/common/src/main/java/io/druid/concurrent/Execs.java
index ea27bf96e05..308208ef98d 100644
--- a/common/src/main/java/io/druid/concurrent/Execs.java
+++ b/common/src/main/java/io/druid/concurrent/Execs.java
@@ -21,10 +21,15 @@ package io.druid.concurrent;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
/**
*/
@@ -49,4 +54,31 @@ public class Execs
{
return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(nameFormat).build();
}
+
+ /**
+ * @param nameFormat nameformat for threadFactory
+ * @param capacity maximum capacity after which the executorService will block on accepting new tasks
+ * @return ExecutorService which blocks accepting new tasks when the capacity reached
+ */
+ public static ExecutorService newBlockingSingleThreaded(String nameFormat, int capacity)
+ {
+ return new ThreadPoolExecutor(
+ 1, 1,
+ 0L, TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue(capacity), makeThreadFactory(nameFormat)
+ , new RejectedExecutionHandler()
+ {
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
+ {
+ try {
+ ((ArrayBlockingQueue) executor.getQueue()).put(r);
+ }
+ catch (InterruptedException e) {
+ throw new RejectedExecutionException("Got Interrupted while adding to the Queue");
+ }
+ }
+ }
+ );
+ }
}
diff --git a/common/src/test/java/io/druid/concurrent/ExecsTest.java b/common/src/test/java/io/druid/concurrent/ExecsTest.java
new file mode 100644
index 00000000000..809ed5eac02
--- /dev/null
+++ b/common/src/test/java/io/druid/concurrent/ExecsTest.java
@@ -0,0 +1,92 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.concurrent;
+
+import com.google.common.base.Throwables;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ExecsTest
+{
+ @Test
+ public void testBlockingExecutorService() throws Exception
+ {
+ final int capacity = 3;
+ final ExecutorService blockingExecutor = Execs.newBlockingSingleThreaded("test%d", capacity);
+ final CountDownLatch queueFullSignal = new CountDownLatch(capacity + 1);
+ final CountDownLatch taskCompletedSignal = new CountDownLatch(2 * capacity);
+ final CountDownLatch taskStartSignal = new CountDownLatch(1);
+ final AtomicInteger producedCount = new AtomicInteger();
+ final AtomicInteger consumedCount = new AtomicInteger();
+ ExecutorService producer = Executors.newSingleThreadExecutor();
+ producer.submit(
+ new Runnable()
+ {
+ public void run()
+ {
+ for (int i = 0; i < 2 * capacity; i++) {
+ final int taskID = i;
+ System.out.println("Produced task" + taskID);
+ blockingExecutor.submit(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ System.out.println("Starting task" + taskID);
+ try {
+ taskStartSignal.await();
+ consumedCount.incrementAndGet();
+ taskCompletedSignal.countDown();
+ }
+ catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ System.out.println("Completed task" + taskID);
+ }
+ }
+ );
+ producedCount.incrementAndGet();
+ queueFullSignal.countDown();
+ }
+ }
+ }
+ );
+
+ queueFullSignal.await();
+ // verify that the producer blocks
+ Assert.assertEquals(capacity + 1, producedCount.get());
+ // let the tasks run
+ taskStartSignal.countDown();
+ // wait until all tasks complete
+ taskCompletedSignal.await();
+ // verify all tasks consumed
+ Assert.assertEquals(2 * capacity, consumedCount.get());
+ // cleanup
+ blockingExecutor.shutdown();
+ producer.shutdown();
+
+ }
+}
diff --git a/docs/content/Broker.md b/docs/content/Broker.md
index b4da210a35f..0cd6893a3a4 100644
--- a/docs/content/Broker.md
+++ b/docs/content/Broker.md
@@ -37,7 +37,7 @@ The broker module uses several of the default modules in [Configuration](Configu
|Property|Possible Values|Description|Default|
|--------|---------------|-----------|-------|
-|`druid.broker.cache.type`|`local`, `memcache`|The type of cache to use for queries.|`local`|
+|`druid.broker.cache.type`|`local`, `memcached`|The type of cache to use for queries.|`local`|
|`druid.broker.balancer.type`|`random`, `connectionCount`|Determines how the broker balances connections to compute nodes. `random` choose randomly, `connectionCount` picks the node with the fewest number of active connections to|`random`|
#### Local Cache
diff --git a/docs/content/Examples.md b/docs/content/Examples.md
index 335feb3f233..2ea2b8567c8 100644
--- a/docs/content/Examples.md
+++ b/docs/content/Examples.md
@@ -19,13 +19,13 @@ Clone Druid and build it:
git clone https://github.com/metamx/druid.git druid
cd druid
git fetch --tags
-git checkout druid-0.6.51
+git checkout druid-0.6.52
./build.sh
```
### Downloading the DSK (Druid Standalone Kit)
-[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.51-bin.tar.gz) a stand-alone tarball and run it:
+[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.52-bin.tar.gz) a stand-alone tarball and run it:
``` bash
tar -xzf druid-services-0.X.X-bin.tar.gz
diff --git a/docs/content/Plumber.md b/docs/content/Plumber.md
index dfbb3b6b3bf..bbf6b79cbc4 100644
--- a/docs/content/Plumber.md
+++ b/docs/content/Plumber.md
@@ -13,6 +13,8 @@ We provide a brief description of the example to exemplify the types of things t
* `windowPeriod` is the amount of lag time to allow events. This is configured with a 10 minute window, meaning that any event more than 10 minutes ago will be thrown away and not included in the segment generated by the realtime server.
* `basePersistDirectory` is the directory to put things that need persistence. The plumber is responsible for the actual intermediate persists and this tells it where to store those persists.
+* `maxPendingPersists` is how many persists a plumber can do concurrently without starting to block.
+
Available Plumbers
------------------
diff --git a/docs/content/Realtime-ingestion.md b/docs/content/Realtime-ingestion.md
index f183ad06804..8b895822cf5 100644
--- a/docs/content/Realtime-ingestion.md
+++ b/docs/content/Realtime-ingestion.md
@@ -97,7 +97,6 @@ This describes the data schema for the output Druid segment. More information ab
|aggregators|Array of Objects|The list of aggregators to use to aggregate colliding rows together.|yes|
|dataSource|String|The name of the dataSource that the segment belongs to.|yes|
|indexGranularity|String|The granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows.|yes|
-|segmentGranularity|String|The granularity of the segment as a whole. This is generally larger than the index granularity and describes the rate at which the realtime server will push segments out for historical servers to take over.|yes|
|shardSpec|Object|This describes the shard that is represented by this server. This must be specified properly in order to have multiple realtime nodes indexing the same data stream in a sharded fashion.|no|
### Config
diff --git a/docs/content/Realtime.md b/docs/content/Realtime.md
index 8351ae3479e..0ee39550b09 100644
--- a/docs/content/Realtime.md
+++ b/docs/content/Realtime.md
@@ -27,7 +27,7 @@ druid.host=localhost
druid.service=realtime
druid.port=8083
-druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.51"]
+druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.52"]
druid.zk.service.host=localhost
diff --git a/docs/content/Tutorial:-A-First-Look-at-Druid.md b/docs/content/Tutorial:-A-First-Look-at-Druid.md
index 6b6557bd7e0..f0dd8c61bba 100644
--- a/docs/content/Tutorial:-A-First-Look-at-Druid.md
+++ b/docs/content/Tutorial:-A-First-Look-at-Druid.md
@@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
### Download a Tarball
-We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.51-bin.tar.gz). Download this file to a directory of your choosing.
+We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.52-bin.tar.gz). Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
-cd druid-services-0.6.51
+cd druid-services-0.6.52
```
You should see a bunch of files:
diff --git a/docs/content/Tutorial:-Loading-Your-Data-Part-1.md b/docs/content/Tutorial:-Loading-Your-Data-Part-1.md
index 45ad2179cee..5a9d57b7ecb 100644
--- a/docs/content/Tutorial:-Loading-Your-Data-Part-1.md
+++ b/docs/content/Tutorial:-Loading-Your-Data-Part-1.md
@@ -219,9 +219,9 @@ Congratulations! The segment has completed building. Once a segment is built, a
You should see the following logs on the coordinator:
```bash
-2013-10-09 21:41:54,368 INFO [Coordinator-Exec--0] io.druid.server.coordinator.DruidCoordinatorLogger - [_default_tier] : Assigned 1 segments among 1 servers
-2013-10-09 21:41:54,369 INFO [Coordinator-Exec--0] io.druid.server.coordinator.DruidCoordinatorLogger - Load Queues:
-2013-10-09 21:41:54,369 INFO [Coordinator-Exec--0] io.druid.server.coordinator.DruidCoordinatorLogger - Server[localhost:8081, historical, _default_tier] has 1 left to load, 0 left to drop, 4,477 bytes queued, 4,477 bytes served.
+2013-10-09 21:41:54,368 INFO [Coordinator-Exec--0] io.druid.server.coordinator.helper.DruidCoordinatorLogger - [_default_tier] : Assigned 1 segments among 1 servers
+2013-10-09 21:41:54,369 INFO [Coordinator-Exec--0] io.druid.server.coordinator.helper.DruidCoordinatorLogger - Load Queues:
+2013-10-09 21:41:54,369 INFO [Coordinator-Exec--0] io.druid.server.coordinator.helper.DruidCoordinatorLogger - Server[localhost:8081, historical, _default_tier] has 1 left to load, 0 left to drop, 4,477 bytes queued, 4,477 bytes served.
```
These logs indicate that the coordinator has assigned our new segment to the historical node to download and serve. If you look at the historical node logs, you should see:
diff --git a/docs/content/Tutorial:-Loading-Your-Data-Part-2.md b/docs/content/Tutorial:-Loading-Your-Data-Part-2.md
index 5efd2c05955..913548a1d9c 100644
--- a/docs/content/Tutorial:-Loading-Your-Data-Part-2.md
+++ b/docs/content/Tutorial:-Loading-Your-Data-Part-2.md
@@ -45,7 +45,7 @@ With real-world data, we recommend having a message bus such as [Apache Kafka](h
#### Setting up Kafka
-[KafkaFirehoseFactory](https://github.com/metamx/druid/blob/druid-0.6.51/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java) is how druid communicates with Kafka. Using this [Firehose](Firehose.html) with the right configuration, we can import data into Druid in real-time without writing any code. To load data to a real-time node via Kafka, we'll first need to initialize Zookeeper and Kafka, and then configure and initialize a [Realtime](Realtime.html) node.
+[KafkaFirehoseFactory](https://github.com/metamx/druid/blob/druid-0.6.52/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java) is how druid communicates with Kafka. Using this [Firehose](Firehose.html) with the right configuration, we can import data into Druid in real-time without writing any code. To load data to a real-time node via Kafka, we'll first need to initialize Zookeeper and Kafka, and then configure and initialize a [Realtime](Realtime.html) node.
Instructions for booting a Zookeeper and then Kafka cluster are available [here](http://kafka.apache.org/07/quickstart.html).
diff --git a/docs/content/Tutorial:-The-Druid-Cluster.md b/docs/content/Tutorial:-The-Druid-Cluster.md
index d76e01e097b..1ce36bca5a1 100644
--- a/docs/content/Tutorial:-The-Druid-Cluster.md
+++ b/docs/content/Tutorial:-The-Druid-Cluster.md
@@ -13,7 +13,9 @@ In this tutorial, we will set up other types of Druid nodes and external depende
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
-You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.51-bin.tar.gz) and untar the contents within by issuing:
+You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.52-bin.tar.gz)
+
+and untar the contents within by issuing:
```bash
tar -zxvf druid-services-*-bin.tar.gz
@@ -147,7 +149,7 @@ druid.port=8081
druid.zk.service.host=localhost
-druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.51"]
+druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.52"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
@@ -238,7 +240,7 @@ druid.port=8083
druid.zk.service.host=localhost
-druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.51","io.druid.extensions:druid-kafka-seven:0.6.51"]
+druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.52","io.druid.extensions:druid-kafka-seven:0.6.52"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop
diff --git a/docs/content/Tutorial:-Webstream.md b/docs/content/Tutorial:-Webstream.md
index a3f3e1e9046..176835fb923 100644
--- a/docs/content/Tutorial:-Webstream.md
+++ b/docs/content/Tutorial:-Webstream.md
@@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
h3. Download a Tarball
-We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.51-bin.tar.gz)
+We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.52-bin.tar.gz)
Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
-cd druid-services-0.6.51
+cd druid-services-0.6.52
```
You should see a bunch of files:
diff --git a/docs/content/Twitter-Tutorial.textile b/docs/content/Twitter-Tutorial.textile
index 3ca3c6571b9..c08774a57bc 100644
--- a/docs/content/Twitter-Tutorial.textile
+++ b/docs/content/Twitter-Tutorial.textile
@@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source.
h3. Download a Tarball
-We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.51-bin.tar.gz.
+We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.52-bin.tar.gz.
Download this bad boy to a directory of your choosing.
You can extract the awesomeness within by issuing:
diff --git a/examples/config/historical/runtime.properties b/examples/config/historical/runtime.properties
index 69ab0cd4dce..dec3ad5113b 100644
--- a/examples/config/historical/runtime.properties
+++ b/examples/config/historical/runtime.properties
@@ -4,7 +4,7 @@ druid.port=8081
druid.zk.service.host=localhost
-druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.51"]
+druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.52"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
diff --git a/examples/config/realtime/runtime.properties b/examples/config/realtime/runtime.properties
index 33d09cdf9e6..cbab843c0e4 100644
--- a/examples/config/realtime/runtime.properties
+++ b/examples/config/realtime/runtime.properties
@@ -4,7 +4,7 @@ druid.port=8083
druid.zk.service.host=localhost
-druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.51","io.druid.extensions:druid-kafka-seven:0.6.51","io.druid.extensions:druid-rabbitmq:0.6.51"]
+druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.52","io.druid.extensions:druid-kafka-seven:0.6.52","io.druid.extensions:druid-rabbitmq:0.6.52"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop
diff --git a/examples/pom.xml b/examples/pom.xml
index 840b4ff5b52..1f77718f582 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -28,7 +28,7 @@
io.druiddruid
- 0.6.52-SNAPSHOT
+ 0.6.53-SNAPSHOT
diff --git a/examples/src/test/java/io/druid/examples/web/WebJsonSupplierTest.java b/examples/src/test/java/io/druid/examples/web/WebJsonSupplierTest.java
index d80fd5479f9..ca181427c82 100644
--- a/examples/src/test/java/io/druid/examples/web/WebJsonSupplierTest.java
+++ b/examples/src/test/java/io/druid/examples/web/WebJsonSupplierTest.java
@@ -29,7 +29,7 @@ public class WebJsonSupplierTest
public void checkInvalidUrl() throws Exception
{
- String invalidURL = "http://invalid.url";
+ String invalidURL = "http://invalid.url.";
WebJsonSupplier supplier = new WebJsonSupplier(invalidURL);
supplier.getInput();
}
diff --git a/hdfs-storage/pom.xml b/hdfs-storage/pom.xml
index 87980619bb2..8ec3b6dcf4b 100644
--- a/hdfs-storage/pom.xml
+++ b/hdfs-storage/pom.xml
@@ -28,7 +28,7 @@
io.druiddruid
- 0.6.52-SNAPSHOT
+ 0.6.53-SNAPSHOT
diff --git a/hll/pom.xml b/hll/pom.xml
new file mode 100644
index 00000000000..58e17295c96
--- /dev/null
+++ b/hll/pom.xml
@@ -0,0 +1,82 @@
+
+
+
+
+ 4.0.0
+ io.druid.extensions
+ druid-hll
+ druid-hll
+ druid-hll
+
+
+ io.druid
+ druid
+ 0.6.53-SNAPSHOT
+
+
+
+
+ io.druid
+ druid-api
+
+
+ io.druid
+ druid-processing
+ ${project.parent.version}
+
+
+ com.metamx
+ emitter
+
+
+ net.sf.trove4j
+ trove4j
+ 3.0.3
+
+
+ commons-codec
+ commons-codec
+
+
+
+
+ junit
+ junit
+ test
+
+
+
+
+
+
+ maven-jar-plugin
+
+
+
+ true
+ true
+
+
+
+
+
+
+
diff --git a/hll/src/main/java/io/druid/query/aggregation/HyperloglogAggregator.java b/hll/src/main/java/io/druid/query/aggregation/HyperloglogAggregator.java
new file mode 100755
index 00000000000..ddbe60dcf67
--- /dev/null
+++ b/hll/src/main/java/io/druid/query/aggregation/HyperloglogAggregator.java
@@ -0,0 +1,137 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.query.aggregation;
+
+import com.google.common.hash.Hashing;
+import com.metamx.common.ISE;
+import com.metamx.common.logger.Logger;
+import gnu.trove.map.TIntByteMap;
+import gnu.trove.map.hash.TIntByteHashMap;
+import io.druid.segment.ObjectColumnSelector;
+
+import java.util.Comparator;
+
+public class HyperloglogAggregator implements Aggregator
+{
+ private static final Logger log = new Logger(HyperloglogAggregator.class);
+
+ public static final int log2m = 12;
+ public static final int m = (int) Math.pow(2, log2m);
+ public static final double alphaMM = (0.7213 / (1 + 1.079 / m)) * m * m;
+
+ private final String name;
+ private final ObjectColumnSelector selector;
+
+ private TIntByteHashMap ibMap;
+
+ static final Comparator COMPARATOR = new Comparator()
+ {
+ @Override
+ public int compare(Object o, Object o1)
+ {
+ return o.equals(o1) ? 0 : 1;
+ }
+ };
+
+ public static Object combine(Object lhs, Object rhs)
+ {
+ final TIntByteMap newIbMap = new TIntByteHashMap((TIntByteMap) lhs);
+ final TIntByteMap rightIbMap = (TIntByteMap) rhs;
+ final int[] keys = rightIbMap.keys();
+
+ for (int key : keys) {
+ if (newIbMap.get(key) == newIbMap.getNoEntryValue() || rightIbMap.get(key) > newIbMap.get(key)) {
+ newIbMap.put(key, rightIbMap.get(key));
+ }
+ }
+
+ return newIbMap;
+ }
+
+ public HyperloglogAggregator(String name, ObjectColumnSelector selector)
+ {
+ this.name = name;
+ this.selector = selector;
+ this.ibMap = new TIntByteHashMap();
+ }
+
+ @Override
+ public void aggregate()
+ {
+ final Object value = selector.get();
+
+ if (value == null) {
+ return;
+ }
+
+ if (value instanceof TIntByteHashMap) {
+ final TIntByteHashMap newIbMap = (TIntByteHashMap) value;
+ final int[] indexes = newIbMap.keys();
+
+ for (int index : indexes) {
+ if (ibMap.get(index) == ibMap.getNoEntryValue() || newIbMap.get(index) > ibMap.get(index)) {
+ ibMap.put(index, newIbMap.get(index));
+ }
+ }
+ } else if (value instanceof String) {
+ log.debug("value [%s]", selector.get());
+
+ final long id = Hashing.murmur3_128().hashString((String) (value)).asLong();
+ final int bucket = (int) (id >>> (Long.SIZE - log2m));
+ final int zerolength = Long.numberOfLeadingZeros((id << log2m) | (1 << (log2m - 1)) + 1) + 1;
+
+ if (ibMap.get(bucket) == ibMap.getNoEntryValue() || ibMap.get(bucket) < (byte) zerolength) {
+ ibMap.put(bucket, (byte) zerolength);
+ }
+ } else {
+ throw new ISE("Aggregate does not support values of type[%s]", value.getClass().getName());
+ }
+ }
+
+ @Override
+ public void reset()
+ {
+ this.ibMap = new TIntByteHashMap();
+ }
+
+ @Override
+ public Object get()
+ {
+ return ibMap;
+ }
+
+ @Override
+ public float getFloat()
+ {
+ throw new UnsupportedOperationException("HyperloglogAggregator does not support getFloat()");
+ }
+
+ @Override
+ public String getName()
+ {
+ return name;
+ }
+
+ @Override
+ public void close()
+ {
+ // do nothing
+ }
+}
diff --git a/hll/src/main/java/io/druid/query/aggregation/HyperloglogAggregatorFactory.java b/hll/src/main/java/io/druid/query/aggregation/HyperloglogAggregatorFactory.java
new file mode 100755
index 00000000000..3f7150b9221
--- /dev/null
+++ b/hll/src/main/java/io/druid/query/aggregation/HyperloglogAggregatorFactory.java
@@ -0,0 +1,209 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.query.aggregation;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import com.metamx.common.logger.Logger;
+import gnu.trove.map.hash.TIntByteHashMap;
+import io.druid.segment.ColumnSelectorFactory;
+import org.apache.commons.codec.binary.Base64;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+public class HyperloglogAggregatorFactory implements AggregatorFactory
+{
+ private static final Logger log = new Logger(HyperloglogAggregatorFactory.class);
+ private static final byte[] CACHE_KEY = new byte[]{0x37};
+
+ private final String name;
+ private final String fieldName;
+
+ @JsonCreator
+ public HyperloglogAggregatorFactory(
+ @JsonProperty("name") final String name,
+ @JsonProperty("fieldName") final String fieldName
+ )
+ {
+ Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
+ Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
+
+ this.name = name;
+ this.fieldName = fieldName;
+ }
+
+ @Override
+ public Aggregator factorize(ColumnSelectorFactory metricFactory)
+ {
+ return new HyperloglogAggregator(
+ name,
+ metricFactory.makeObjectColumnSelector(fieldName)
+ );
+ }
+
+ @Override
+ public BufferAggregator factorizeBuffered(
+ ColumnSelectorFactory metricFactory
+ )
+ {
+ return new HyperloglogBufferAggregator(
+ metricFactory.makeObjectColumnSelector(fieldName)
+ );
+ }
+
+ @Override
+ public Comparator getComparator()
+ {
+ return HyperloglogAggregator.COMPARATOR;
+ }
+
+ @Override
+ public Object combine(Object lhs, Object rhs)
+ {
+ if (rhs == null) {
+ return lhs;
+ }
+ if (lhs == null) {
+ return rhs;
+ }
+ return HyperloglogAggregator.combine(lhs, rhs);
+ }
+
+ @Override
+ public AggregatorFactory getCombiningFactory()
+ {
+ log.debug("factory name: %s", name);
+ return new HyperloglogAggregatorFactory(name, fieldName);
+ }
+
+ @Override
+ public Object deserialize(Object object)
+ {
+ log.debug("class name: [%s]:value [%s]", object.getClass().getName(), object);
+
+ final String k = (String) object;
+ final byte[] ibmapByte = Base64.decodeBase64(k);
+
+ final ByteBuffer buffer = ByteBuffer.wrap(ibmapByte);
+ final int keylength = buffer.getInt();
+ final int valuelength = buffer.getInt();
+
+ TIntByteHashMap newIbMap;
+
+ if (keylength == 0) {
+ newIbMap = new TIntByteHashMap();
+ } else {
+ final int[] keys = new int[keylength];
+ final byte[] values = new byte[valuelength];
+
+ for (int i = 0; i < keylength; i++) {
+ keys[i] = buffer.getInt();
+ }
+ buffer.get(values);
+
+ newIbMap = new TIntByteHashMap(keys, values);
+ }
+
+ return newIbMap;
+ }
+
+ @Override
+ public Object finalizeComputation(Object object)
+ {
+ final TIntByteHashMap ibMap = (TIntByteHashMap) object;
+ final int[] keys = ibMap.keys();
+ final int count = keys.length;
+
+ double registerSum = 0;
+ double zeros = 0.0;
+
+ for (int key : keys) {
+ int val = ibMap.get(key);
+
+ registerSum += 1.0 / (1 << val);
+
+ if (val == 0) {
+ zeros++;
+ }
+ }
+
+ registerSum += (HyperloglogAggregator.m - count);
+ zeros += HyperloglogAggregator.m - count;
+
+ double estimate = HyperloglogAggregator.alphaMM * (1.0 / registerSum);
+
+ if (estimate <= (5.0 / 2.0) * (HyperloglogAggregator.m)) {
+ // Small Range Estimate
+ return Math.round(HyperloglogAggregator.m * Math.log(HyperloglogAggregator.m / zeros));
+ } else {
+ return Math.round(estimate);
+ }
+ }
+
+ @JsonProperty
+ public String getFieldName()
+ {
+ return fieldName;
+ }
+
+ @Override
+ @JsonProperty
+ public String getName()
+ {
+ return name;
+ }
+
+ @Override
+ public List requiredFields()
+ {
+ return Arrays.asList(fieldName);
+ }
+
+ @Override
+ public byte[] getCacheKey()
+ {
+
+ byte[] fieldNameBytes = fieldName.getBytes();
+ return ByteBuffer.allocate(1 + fieldNameBytes.length).put(CACHE_KEY)
+ .put(fieldNameBytes).array();
+ }
+
+ @Override
+ public String getTypeName()
+ {
+ return "hyperloglog";
+ }
+
+ @Override
+ public int getMaxIntermediateSize()
+ {
+ return HyperloglogAggregator.m;
+ }
+
+ @Override
+ public Object getAggregatorStartValue()
+ {
+ return new TIntByteHashMap();
+ }
+}
diff --git a/hll/src/main/java/io/druid/query/aggregation/HyperloglogBufferAggregator.java b/hll/src/main/java/io/druid/query/aggregation/HyperloglogBufferAggregator.java
new file mode 100755
index 00000000000..3681fc8bd40
--- /dev/null
+++ b/hll/src/main/java/io/druid/query/aggregation/HyperloglogBufferAggregator.java
@@ -0,0 +1,94 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.query.aggregation;
+
+import gnu.trove.map.hash.TIntByteHashMap;
+import gnu.trove.procedure.TIntByteProcedure;
+import io.druid.segment.ObjectColumnSelector;
+
+import java.nio.ByteBuffer;
+
+public class HyperloglogBufferAggregator implements BufferAggregator
+{
+ private final ObjectColumnSelector selector;
+
+ public HyperloglogBufferAggregator(ObjectColumnSelector selector)
+ {
+ this.selector = selector;
+ }
+
+ /*
+ * byte 1 key length byte 2 value length byte 3...n key array byte n+1....
+ * value array
+ */
+ @Override
+ public void init(ByteBuffer buf, int position)
+ {
+ for (int i = 0; i < HyperloglogAggregator.m; i++) {
+ buf.put(position + i, (byte) 0);
+ }
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position)
+ {
+ final ByteBuffer fb = buf;
+ final int fp = position;
+ final TIntByteHashMap newObj = (TIntByteHashMap) (selector.get());
+
+ newObj.forEachEntry(
+ new TIntByteProcedure()
+ {
+ public boolean execute(int a, byte b)
+ {
+ if (b > fb.get(fp + a)) {
+ fb.put(fp + a, b);
+ }
+ return true;
+ }
+ }
+ );
+ }
+
+ @Override
+ public Object get(ByteBuffer buf, int position)
+ {
+ final TIntByteHashMap ret = new TIntByteHashMap();
+
+ for (int i = 0; i < HyperloglogAggregator.m; i++) {
+ if (buf.get(position + i) != 0) {
+ ret.put(i, buf.get(position + i));
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public float getFloat(ByteBuffer buf, int position)
+ {
+ throw new UnsupportedOperationException("HyperloglogAggregator does not support getFloat()");
+ }
+
+ @Override
+ public void close()
+ {
+ // do nothing
+ }
+}
diff --git a/hll/src/main/java/io/druid/query/aggregation/HyperloglogComplexMetricSerde.java b/hll/src/main/java/io/druid/query/aggregation/HyperloglogComplexMetricSerde.java
new file mode 100755
index 00000000000..8ba20b4a458
--- /dev/null
+++ b/hll/src/main/java/io/druid/query/aggregation/HyperloglogComplexMetricSerde.java
@@ -0,0 +1,137 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.query.aggregation;
+
+import gnu.trove.map.hash.TIntByteHashMap;
+import io.druid.data.input.InputRow;
+import io.druid.segment.column.ColumnBuilder;
+import io.druid.segment.column.ValueType;
+import io.druid.segment.data.GenericIndexed;
+import io.druid.segment.data.ObjectStrategy;
+import io.druid.segment.serde.ColumnPartSerde;
+import io.druid.segment.serde.ComplexColumnPartSerde;
+import io.druid.segment.serde.ComplexColumnPartSupplier;
+import io.druid.segment.serde.ComplexMetricExtractor;
+import io.druid.segment.serde.ComplexMetricSerde;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class HyperloglogComplexMetricSerde extends ComplexMetricSerde
+{
+ @Override
+ public String getTypeName()
+ {
+ return "hyperloglog";
+ }
+
+ @Override
+ public ComplexMetricExtractor getExtractor()
+ {
+ return new HyperloglogComplexMetricExtractor();
+ }
+
+ @Override
+ public ColumnPartSerde deserializeColumn(ByteBuffer buffer, ColumnBuilder builder)
+ {
+ GenericIndexed column = GenericIndexed.read(buffer, getObjectStrategy());
+ builder.setType(ValueType.COMPLEX);
+ builder.setComplexColumn(new ComplexColumnPartSupplier("hyperloglog", column));
+ return new ComplexColumnPartSerde(column, "hyperloglog");
+ }
+
+ @Override
+ public ObjectStrategy getObjectStrategy()
+ {
+ return new HyperloglogObjectStrategy();
+ }
+
+ public static class HyperloglogObjectStrategy implements ObjectStrategy
+ {
+ @Override
+ public Class extends TIntByteHashMap> getClazz()
+ {
+ return TIntByteHashMap.class;
+ }
+
+ @Override
+ public TIntByteHashMap fromByteBuffer(ByteBuffer buffer, int numBytes)
+ {
+ int keylength = buffer.getInt();
+ int valuelength = buffer.getInt();
+ if (keylength == 0) {
+ return new TIntByteHashMap();
+ }
+ int[] keys = new int[keylength];
+ byte[] values = new byte[valuelength];
+
+ for (int i = 0; i < keylength; i++) {
+ keys[i] = buffer.getInt();
+ }
+
+ buffer.get(values);
+
+ TIntByteHashMap tib = new TIntByteHashMap(keys, values);
+ return tib;
+ }
+
+ @Override
+ public byte[] toBytes(TIntByteHashMap val)
+ {
+ TIntByteHashMap ibmap = val;
+ int[] indexesResult = ibmap.keys();
+ byte[] valueResult = ibmap.values();
+ ByteBuffer buffer = ByteBuffer.allocate(4 * indexesResult.length + valueResult.length + 8);
+ byte[] result = new byte[4 * indexesResult.length + valueResult.length + 8];
+ buffer.putInt((int) indexesResult.length);
+ buffer.putInt((int) valueResult.length);
+ for (int i = 0; i < indexesResult.length; i++) {
+ buffer.putInt(indexesResult[i]);
+ }
+
+ buffer.put(valueResult);
+ buffer.flip();
+ buffer.get(result);
+ return result;
+ }
+
+ @Override
+ public int compare(TIntByteHashMap o1, TIntByteHashMap o2)
+ {
+ return o1.equals(o2) ? 0 : 1;
+ }
+ }
+
+ public static class HyperloglogComplexMetricExtractor implements ComplexMetricExtractor
+ {
+ @Override
+ public Class> extractedClass()
+ {
+ return List.class;
+ }
+
+ @Override
+ public Object extractValue(InputRow inputRow, String metricName)
+ {
+ return inputRow.getRaw(metricName);
+ }
+ }
+}
+
diff --git a/hll/src/main/java/io/druid/query/aggregation/HyperloglogDruidModule.java b/hll/src/main/java/io/druid/query/aggregation/HyperloglogDruidModule.java
new file mode 100644
index 00000000000..f7ef5cad748
--- /dev/null
+++ b/hll/src/main/java/io/druid/query/aggregation/HyperloglogDruidModule.java
@@ -0,0 +1,140 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.query.aggregation;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Binder;
+import gnu.trove.map.hash.TIntByteHashMap;
+import io.druid.initialization.DruidModule;
+import io.druid.segment.serde.ComplexMetrics;
+import org.apache.commons.codec.binary.Base64;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ */
+public class HyperloglogDruidModule implements DruidModule
+{
+ @Override
+ public List extends Module> getJacksonModules()
+ {
+ return ImmutableList.of(
+ new HyperloglogJacksonSerdeModule().registerSubtypes(
+ new NamedType(HyperloglogAggregatorFactory.class, "hyperloglog")
+ )
+ );
+ }
+
+ @Override
+ public void configure(Binder binder)
+ {
+ if (ComplexMetrics.getSerdeForType("hyperloglog") == null) {
+ ComplexMetrics.registerSerde("hyperloglog", new HyperloglogComplexMetricSerde());
+ }
+ }
+
+ public static class HyperloglogJacksonSerdeModule extends SimpleModule
+ {
+ public HyperloglogJacksonSerdeModule()
+ {
+ super("Hyperloglog deserializers");
+
+ addDeserializer(
+ TIntByteHashMap.class,
+ new JsonDeserializer()
+ {
+ @Override
+ public TIntByteHashMap deserialize(
+ JsonParser jp,
+ DeserializationContext ctxt
+ ) throws IOException
+ {
+ byte[] ibmapByte = Base64.decodeBase64(jp.getText());
+
+ ByteBuffer buffer = ByteBuffer.wrap(ibmapByte);
+ int keylength = buffer.getInt();
+ int valuelength = buffer.getInt();
+ if (keylength == 0) {
+ return (new TIntByteHashMap());
+ }
+ int[] keys = new int[keylength];
+ byte[] values = new byte[valuelength];
+
+ for (int i = 0; i < keylength; i++) {
+ keys[i] = buffer.getInt();
+ }
+ buffer.get(values);
+
+ return (new TIntByteHashMap(keys, values));
+ }
+ }
+ );
+
+ addSerializer(
+ TIntByteHashMap.class,
+ new JsonSerializer()
+ {
+ @Override
+ public void serialize(
+ TIntByteHashMap ibmap,
+ JsonGenerator jsonGenerator,
+ SerializerProvider serializerProvider
+ )
+ throws IOException, JsonProcessingException
+ {
+ int[] indexesResult = ibmap.keys();
+ byte[] valueResult = ibmap.values();
+ ByteBuffer buffer = ByteBuffer
+ .allocate(
+ 4 * indexesResult.length
+ + valueResult.length + 8
+ );
+ byte[] result = new byte[4 * indexesResult.length
+ + valueResult.length + 8];
+ buffer.putInt((int) indexesResult.length);
+ buffer.putInt((int) valueResult.length);
+ for (int i = 0; i < indexesResult.length; i++) {
+ buffer.putInt(indexesResult[i]);
+ }
+
+ buffer.put(valueResult);
+ buffer.flip();
+ buffer.get(result);
+ String str = Base64.encodeBase64String(result);
+ jsonGenerator.writeString(str);
+ }
+ }
+ );
+
+ }
+ }
+}
diff --git a/hll/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/hll/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
new file mode 100644
index 00000000000..75977329c70
--- /dev/null
+++ b/hll/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
@@ -0,0 +1 @@
+io.druid.query.aggregation.HyperloglogDruidModule
\ No newline at end of file
diff --git a/hll/src/test/java/io/druid/query/aggregation/HyperloglogAggregatorTest.java b/hll/src/test/java/io/druid/query/aggregation/HyperloglogAggregatorTest.java
new file mode 100644
index 00000000000..7d0a8f3c0e6
--- /dev/null
+++ b/hll/src/test/java/io/druid/query/aggregation/HyperloglogAggregatorTest.java
@@ -0,0 +1,162 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.query.aggregation;
+
+import gnu.trove.map.hash.TIntByteHashMap;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Comparator;
+
+public class HyperloglogAggregatorTest
+{
+ @Test
+ public void testAggregate()
+ {
+ final TestHllComplexMetricSelector selector = new TestHllComplexMetricSelector();
+ final HyperloglogAggregatorFactory aggFactory = new HyperloglogAggregatorFactory("billy", "billyG");
+ final HyperloglogAggregator agg = new HyperloglogAggregator("billy", selector);
+
+ Assert.assertEquals("billy", agg.getName());
+
+ Assert.assertEquals(0L, aggFactory.finalizeComputation(agg.get()));
+ Assert.assertEquals(0L, aggFactory.finalizeComputation(agg.get()));
+ Assert.assertEquals(0L, aggFactory.finalizeComputation(agg.get()));
+
+ aggregate(selector, agg);
+ aggregate(selector, agg);
+ aggregate(selector, agg);
+
+ Assert.assertEquals(3L, aggFactory.finalizeComputation(agg.get()));
+ Assert.assertEquals(3L, aggFactory.finalizeComputation(agg.get()));
+ Assert.assertEquals(3L, aggFactory.finalizeComputation(agg.get()));
+
+ aggregate(selector, agg);
+ aggregate(selector, agg);
+
+ Assert.assertEquals(5L, aggFactory.finalizeComputation(agg.get()));
+ Assert.assertEquals(5L, aggFactory.finalizeComputation(agg.get()));
+ }
+
+ @Test
+ public void testComparator()
+ {
+ final TestHllComplexMetricSelector selector = new TestHllComplexMetricSelector();
+ final Comparator comp = new HyperloglogAggregatorFactory("billy", "billyG").getComparator();
+ final HyperloglogAggregator agg = new HyperloglogAggregator("billy", selector);
+
+ Object first = new TIntByteHashMap((TIntByteHashMap) agg.get());
+ agg.aggregate();
+
+ Assert.assertEquals(0, comp.compare(first, first));
+ Assert.assertEquals(0, comp.compare(agg.get(), agg.get()));
+ Assert.assertEquals(1, comp.compare(agg.get(), first));
+ }
+
+ @Test
+ public void testHighCardinalityAggregate()
+ {
+ final TestHllComplexMetricSelector selector = new TestHllComplexMetricSelector();
+ final HyperloglogAggregatorFactory aggFactory = new HyperloglogAggregatorFactory("billy", "billyG");
+ final HyperloglogAggregator agg = new HyperloglogAggregator("billy", selector);
+
+ final int card = 100000;
+
+ for (int i = 0; i < card; i++) {
+ aggregate(selector, agg);
+ }
+
+ Assert.assertEquals(99443L, aggFactory.finalizeComputation(agg.get()));
+ }
+
+ // Provides a nice printout of error rates as a function of cardinality
+ //@Test
+ public void benchmarkAggregation() throws Exception
+ {
+ final TestHllComplexMetricSelector selector = new TestHllComplexMetricSelector();
+ final HyperloglogAggregatorFactory aggFactory = new HyperloglogAggregatorFactory("billy", "billyG");
+
+ double error = 0.0d;
+ int count = 0;
+
+ final int[] valsToCheck = {
+ 10, 20, 50, 100, 1000, 2000, 5000, 10000, 20000, 50000, 100000, 1000000, 2000000, 10000000, Integer.MAX_VALUE
+ };
+
+ for (int numThings : valsToCheck) {
+ long startTime = System.currentTimeMillis();
+ final HyperloglogAggregator agg = new HyperloglogAggregator("billy", selector);
+
+ for (int i = 0; i < numThings; ++i) {
+ if (i != 0 && i % 100000000 == 0) {
+ ++count;
+ error = computeError(error, count, i, (Long) aggFactory.finalizeComputation(agg.get()), startTime);
+ }
+ aggregate(selector, agg);
+ }
+
+ ++count;
+ error = computeError(error, count, numThings, (Long) aggFactory.finalizeComputation(agg.get()), startTime);
+ }
+ }
+
+ //@Test
+ public void benchmarkCombine() throws Exception
+ {
+ int count;
+ long totalTime = 0;
+
+ final TestHllComplexMetricSelector selector = new TestHllComplexMetricSelector();
+ TIntByteHashMap combined = new TIntByteHashMap();
+
+ for (count = 0; count < 1000000; ++count) {
+ final HyperloglogAggregator agg = new HyperloglogAggregator("billy", selector);
+ aggregate(selector, agg);
+
+ long start = System.nanoTime();
+ combined = (TIntByteHashMap) HyperloglogAggregator.combine(agg.get(), combined);
+ totalTime += System.nanoTime() - start;
+ }
+ System.out.printf("benchmarkCombine took %d ms%n", totalTime / 1000000);
+ }
+
+ private double computeError(double error, int count, long exactValue, long estimatedValue, long startTime)
+ {
+ final double errorThisTime = Math.abs((double) exactValue - estimatedValue) / exactValue;
+
+ error += errorThisTime;
+
+ System.out.printf(
+ "%,d ==? %,d in %,d millis. actual error[%,f%%], avg. error [%,f%%]%n",
+ exactValue,
+ estimatedValue,
+ System.currentTimeMillis() - startTime,
+ 100 * errorThisTime,
+ (error / count) * 100
+ );
+ return error;
+ }
+
+ private void aggregate(TestHllComplexMetricSelector selector, HyperloglogAggregator agg)
+ {
+ agg.aggregate();
+ selector.increment();
+ }
+}
diff --git a/hll/src/test/java/io/druid/query/aggregation/TestHllComplexMetricSelector.java b/hll/src/test/java/io/druid/query/aggregation/TestHllComplexMetricSelector.java
new file mode 100644
index 00000000000..2ba241e01ca
--- /dev/null
+++ b/hll/src/test/java/io/druid/query/aggregation/TestHllComplexMetricSelector.java
@@ -0,0 +1,45 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2013 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.query.aggregation;
+
+import io.druid.segment.ObjectColumnSelector;
+
+public class TestHllComplexMetricSelector implements ObjectColumnSelector
+{
+ private int index = 0;
+
+
+ @Override
+ public Class classOfObject()
+ {
+ return String.class;
+ }
+
+ @Override
+ public String get()
+ {
+ return String.valueOf(index);
+ }
+
+ public void increment()
+ {
+ ++index;
+ }
+}
diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml
index daaea363a68..726a106e47c 100644
--- a/indexing-hadoop/pom.xml
+++ b/indexing-hadoop/pom.xml
@@ -28,7 +28,7 @@
io.druiddruid
- 0.6.52-SNAPSHOT
+ 0.6.53-SNAPSHOT
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DbUpdaterJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DbUpdaterJob.java
index 32e81d26c25..36b67e10c05 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/DbUpdaterJob.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/DbUpdaterJob.java
@@ -19,11 +19,9 @@
package io.druid.indexer;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.logger.Logger;
import io.druid.db.DbConnector;
-import io.druid.jackson.DefaultObjectMapper;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.Handle;
@@ -39,8 +37,6 @@ public class DbUpdaterJob implements Jobby
{
private static final Logger log = new Logger(DbUpdaterJob.class);
- private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
-
private final HadoopDruidIndexerConfig config;
private final IDBI dbi;
@@ -82,7 +78,7 @@ public class DbUpdaterJob implements Jobby
.put("partitioned", segment.getShardSpec().getPartitionNum())
.put("version", segment.getVersion())
.put("used", true)
- .put("payload", jsonMapper.writeValueAsString(segment))
+ .put("payload", HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(segment))
.build()
);
diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index a97c98861cd..1a7881a56ff 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -28,7 +28,7 @@
io.druiddruid
- 0.6.52-SNAPSHOT
+ 0.6.53-SNAPSHOT
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
index 01fa6e69149..7a40035c3e6 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
@@ -86,6 +86,9 @@ public class RealtimeIndexTask extends AbstractTask
@JsonIgnore
private final Period windowPeriod;
+ @JsonIgnore
+ private final int maxPendingPersists;
+
@JsonIgnore
private final IndexGranularity segmentGranularity;
@@ -106,6 +109,7 @@ public class RealtimeIndexTask extends AbstractTask
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig,
@JsonProperty("windowPeriod") Period windowPeriod,
+ @JsonProperty("maxPendingPersists") int maxPendingPersists,
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity,
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory
)
@@ -113,7 +117,7 @@ public class RealtimeIndexTask extends AbstractTask
super(
id == null
? makeTaskId(schema.getDataSource(), schema.getShardSpec().getPartitionNum(), new DateTime().toString())
- :id,
+ : id,
String.format(
"index_realtime_%s",
@@ -135,6 +139,9 @@ public class RealtimeIndexTask extends AbstractTask
this.firehoseFactory = firehoseFactory;
this.fireDepartmentConfig = fireDepartmentConfig;
this.windowPeriod = windowPeriod;
+ this.maxPendingPersists = (maxPendingPersists == 0)
+ ? RealtimePlumberSchool.DEFAULT_MAX_PENDING_PERSISTS
+ : maxPendingPersists;
this.segmentGranularity = segmentGranularity;
this.rejectionPolicyFactory = rejectionPolicyFactory;
}
@@ -196,6 +203,7 @@ public class RealtimeIndexTask extends AbstractTask
new File(toolbox.getTaskWorkDir(), "persist"),
segmentGranularity
);
+ realtimePlumberSchool.setDefaultMaxPendingPersists(maxPendingPersists);
final SegmentPublisher segmentPublisher = new TaskActionSegmentPublisher(this, toolbox);
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/AutoScalingStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/AutoScalingStrategy.java
index 392919df039..0c9c7a06bee 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/AutoScalingStrategy.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/AutoScalingStrategy.java
@@ -30,6 +30,8 @@ public interface AutoScalingStrategy
public AutoScalingData terminate(List ips);
+ public AutoScalingData terminateWithIds(List ids);
+
/**
* Provides a lookup of ip addresses to node ids
* @param ips - nodes IPs
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/EC2AutoScalingStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/EC2AutoScalingStrategy.java
index b59f3d1e74e..9081e7323d1 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/EC2AutoScalingStrategy.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/EC2AutoScalingStrategy.java
@@ -155,32 +155,15 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy
}
try {
- log.info("Terminating instance[%s]", instances);
- amazonEC2Client.terminateInstances(
- new TerminateInstancesRequest(
- Lists.transform(
- instances,
- new Function()
- {
- @Override
- public String apply(Instance input)
- {
- return input.getInstanceId();
- }
- }
- )
- )
- );
-
- return new AutoScalingData(
+ return terminateWithIds(
Lists.transform(
- ips,
- new Function()
+ instances,
+ new Function()
{
@Override
- public String apply(@Nullable String input)
+ public String apply(Instance input)
{
- return String.format("%s:%s", input, config.getWorkerPort());
+ return input.getInstanceId();
}
}
)
@@ -193,6 +176,28 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy
return null;
}
+ @Override
+ public AutoScalingData terminateWithIds(List ids)
+ {
+ if (ids.isEmpty()) {
+ return new AutoScalingData(Lists.newArrayList());
+ }
+
+ try {
+ log.info("Terminating instances[%s]", ids);
+ amazonEC2Client.terminateInstances(
+ new TerminateInstancesRequest(ids)
+ );
+
+ return new AutoScalingData(ids);
+ }
+ catch (Exception e) {
+ log.error(e, "Unable to terminate any instances.");
+ }
+
+ return null;
+ }
+
@Override
public List ipToIdLookup(List ips)
{
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/NoopAutoScalingStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/NoopAutoScalingStrategy.java
index 893f69ca9f4..8c3c14ca336 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/NoopAutoScalingStrategy.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/NoopAutoScalingStrategy.java
@@ -44,6 +44,13 @@ public class NoopAutoScalingStrategy implements AutoScalingStrategy
return null;
}
+ @Override
+ public AutoScalingData terminateWithIds(List ids)
+ {
+ log.info("If I were a real strategy I'd terminate %s now", ids);
+ return null;
+ }
+
@Override
public List ipToIdLookup(List ips)
{
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategy.java
index 10e084b3c9e..76b752a707a 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategy.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategy.java
@@ -132,8 +132,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
.addData("provisioningCount", currentlyProvisioning.size())
.emit();
- List nodeIps = autoScalingStrategy.idToIpLookup(Lists.newArrayList(currentlyProvisioning));
- autoScalingStrategy.terminate(nodeIps);
+ autoScalingStrategy.terminateWithIds(Lists.newArrayList(currentlyProvisioning));
currentlyProvisioning.clear();
}
}
diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java
index 178cae10513..ba3a7787289 100644
--- a/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java
+++ b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java
@@ -51,6 +51,7 @@ public class TestRealtimeTask extends RealtimeIndexTask
null,
null,
null,
+ 1,
null,
null
);
diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java
index 77ae0af4b52..e9ace7ac18a 100644
--- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java
@@ -198,6 +198,7 @@ public class TaskSerdeTest
null,
null,
new Period("PT10M"),
+ 1,
IndexGranularity.HOUR,
null
);
diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/EC2AutoScalingStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/EC2AutoScalingStrategyTest.java
index e6cd52c80ac..1fd3510b45a 100644
--- a/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/EC2AutoScalingStrategyTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/EC2AutoScalingStrategyTest.java
@@ -131,6 +131,6 @@ public class EC2AutoScalingStrategyTest
AutoScalingData deleted = strategy.terminate(Arrays.asList("dummyIP"));
Assert.assertEquals(deleted.getNodeIds().size(), 1);
- Assert.assertEquals(String.format("%s:8080", IP), deleted.getNodeIds().get(0));
+ Assert.assertEquals(INSTANCE_ID, deleted.getNodeIds().get(0));
}
}
diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategyTest.java
index 6ffc6ae6222..aa41656aa90 100644
--- a/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategyTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategyTest.java
@@ -187,9 +187,7 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.>anyObject()))
.andReturn(Lists.newArrayList()).times(2);
- EasyMock.expect(autoScalingStrategy.idToIpLookup(EasyMock.>anyObject()))
- .andReturn(Lists.newArrayList());
- EasyMock.expect(autoScalingStrategy.terminate(EasyMock.>anyObject()))
+ EasyMock.expect(autoScalingStrategy.terminateWithIds(EasyMock.>anyObject()))
.andReturn(null);
EasyMock.expect(autoScalingStrategy.provision()).andReturn(
new AutoScalingData(Lists.newArrayList("fake"))
diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java
index e4c16b11c28..303780de3b2 100644
--- a/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java
@@ -46,6 +46,7 @@ public class TaskAnnouncementTest
null,
null,
new Period("PT10M"),
+ 1,
IndexGranularity.HOUR,
null
);
diff --git a/kafka-eight/pom.xml b/kafka-eight/pom.xml
index 2126d975fd9..5ebec47b519 100644
--- a/kafka-eight/pom.xml
+++ b/kafka-eight/pom.xml
@@ -28,7 +28,7 @@
io.druiddruid
- 0.6.52-SNAPSHOT
+ 0.6.53-SNAPSHOT
diff --git a/kafka-seven/pom.xml b/kafka-seven/pom.xml
index c174067823c..6a26af7e73a 100644
--- a/kafka-seven/pom.xml
+++ b/kafka-seven/pom.xml
@@ -28,7 +28,7 @@
io.druiddruid
- 0.6.52-SNAPSHOT
+ 0.6.53-SNAPSHOT
@@ -39,7 +39,7 @@
kafkacore-kafka
- 0.7.2-mmx1
+ 0.7.2-mmx4log4j
diff --git a/pom.xml b/pom.xml
index 395ff15927f..990a1a75e5c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,7 +23,7 @@
io.druiddruidpom
- 0.6.52-SNAPSHOT
+ 0.6.53-SNAPSHOTdruiddruid
@@ -41,7 +41,7 @@
UTF-80.25.22.3.0
- 0.1.7
+ 0.1.8
@@ -59,6 +59,7 @@
kafka-sevenkafka-eightrabbitmq
+ hll
@@ -217,47 +218,47 @@
com.fasterxml.jackson.corejackson-annotations
- 2.2.2
+ 2.2.3com.fasterxml.jackson.corejackson-core
- 2.2.2
+ 2.2.3com.fasterxml.jackson.corejackson-databind
- 2.2.2
+ 2.2.3com.fasterxml.jackson.datatypejackson-datatype-guava
- 2.2.2
+ 2.2.3com.fasterxml.jackson.datatypejackson-datatype-joda
- 2.2.2
+ 2.2.3com.fasterxml.jackson.dataformatjackson-dataformat-smile
- 2.2.2
+ 2.2.3com.fasterxml.jackson.jaxrsjackson-jaxrs-json-provider
- 2.2.2
+ 2.2.3org.codehaus.jacksonjackson-core-asl
- 1.9.11
+ 1.9.13org.codehaus.jacksonjackson-mapper-asl
- 1.9.11
+ 1.9.13org.hibernate
diff --git a/processing/pom.xml b/processing/pom.xml
index e105c320955..e12371ac4c1 100644
--- a/processing/pom.xml
+++ b/processing/pom.xml
@@ -28,7 +28,7 @@
io.druiddruid
- 0.6.52-SNAPSHOT
+ 0.6.53-SNAPSHOT
diff --git a/processing/src/main/java/io/druid/query/Query.java b/processing/src/main/java/io/druid/query/Query.java
index f4ad958e75d..39da69c2b40 100644
--- a/processing/src/main/java/io/druid/query/Query.java
+++ b/processing/src/main/java/io/druid/query/Query.java
@@ -25,6 +25,7 @@ import com.metamx.common.guava.Sequence;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
import io.druid.query.search.search.SearchQuery;
+import io.druid.query.select.SelectQuery;
import io.druid.query.spec.QuerySegmentSpec;
import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.query.timeseries.TimeseriesQuery;
@@ -42,6 +43,7 @@ import java.util.Map;
@JsonSubTypes.Type(name = Query.TIME_BOUNDARY, value = TimeBoundaryQuery.class),
@JsonSubTypes.Type(name = Query.GROUP_BY, value = GroupByQuery.class),
@JsonSubTypes.Type(name = Query.SEGMENT_METADATA, value = SegmentMetadataQuery.class),
+ @JsonSubTypes.Type(name = Query.SELECT, value = SelectQuery.class),
@JsonSubTypes.Type(name = Query.TOPN, value = TopNQuery.class)
})
public interface Query
@@ -51,6 +53,7 @@ public interface Query
public static final String TIME_BOUNDARY = "timeBoundary";
public static final String GROUP_BY = "groupBy";
public static final String SEGMENT_METADATA = "segmentMetadata";
+ public static final String SELECT = "select";
public static final String TOPN = "topN";
public String getDataSource();
diff --git a/processing/src/main/java/io/druid/query/select/EventHolder.java b/processing/src/main/java/io/druid/query/select/EventHolder.java
new file mode 100644
index 00000000000..1ac3661d1f5
--- /dev/null
+++ b/processing/src/main/java/io/druid/query/select/EventHolder.java
@@ -0,0 +1,117 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.query.select;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Maps;
+import org.joda.time.DateTime;
+
+import java.util.Map;
+
+/**
+ */
+public class EventHolder
+{
+ public static final String timestampKey = "timestamp";
+
+ private final String segmentId;
+ private final int offset;
+ private final Map event;
+
+ @JsonCreator
+ public EventHolder(
+ @JsonProperty("segmentId") String segmentId,
+ @JsonProperty("offset") int offset,
+ @JsonProperty("event") Map event
+ )
+ {
+ this.segmentId = segmentId;
+ this.offset = offset;
+ this.event = event;
+ }
+
+ public DateTime getTimestamp()
+ {
+ return (DateTime) event.get(timestampKey);
+ }
+
+ @JsonProperty
+ public String getSegmentId()
+ {
+ return segmentId;
+ }
+
+ @JsonProperty
+ public int getOffset()
+ {
+ return offset;
+ }
+
+ @JsonProperty
+ public Map getEvent()
+ {
+ return event;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ EventHolder that = (EventHolder) o;
+
+ if (offset != that.offset) {
+ return false;
+ }
+ if (!Maps.difference(event, ((EventHolder) o).event).areEqual()) {
+ return false;
+ }
+ if (segmentId != null ? !segmentId.equals(that.segmentId) : that.segmentId != null) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = segmentId != null ? segmentId.hashCode() : 0;
+ result = 31 * result + offset;
+ result = 31 * result + (event != null ? event.hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "EventHolder{" +
+ "segmentId='" + segmentId + '\'' +
+ ", offset=" + offset +
+ ", event=" + event +
+ '}';
+ }
+}
diff --git a/processing/src/main/java/io/druid/query/select/PagingSpec.java b/processing/src/main/java/io/druid/query/select/PagingSpec.java
new file mode 100644
index 00000000000..7be4cf62746
--- /dev/null
+++ b/processing/src/main/java/io/druid/query/select/PagingSpec.java
@@ -0,0 +1,100 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.query.select;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.primitives.Ints;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ */
+public class PagingSpec
+{
+ private final LinkedHashMap pagingIdentifiers;
+ private final int threshold;
+
+ @JsonCreator
+ public PagingSpec(
+ @JsonProperty("pagingIdentifiers") LinkedHashMap pagingIdentifiers,
+ @JsonProperty("threshold") int threshold
+ )
+ {
+ this.pagingIdentifiers = pagingIdentifiers;
+ this.threshold = threshold;
+ }
+
+ @JsonProperty
+ public Map getPagingIdentifiers()
+ {
+ return pagingIdentifiers;
+ }
+
+ @JsonProperty
+ public int getThreshold()
+ {
+ return threshold;
+ }
+
+ public byte[] getCacheKey()
+ {
+ final byte[][] pagingKeys = new byte[pagingIdentifiers.size()][];
+ final byte[][] pagingValues = new byte[pagingIdentifiers.size()][];
+
+ int index = 0;
+ int pagingKeysSize = 0;
+ int pagingValuesSize = 0;
+ for (Map.Entry entry : pagingIdentifiers.entrySet()) {
+ pagingKeys[index] = entry.getKey().getBytes();
+ pagingValues[index] = ByteBuffer.allocate(Ints.BYTES).putInt(entry.getValue()).array();
+ pagingKeysSize += pagingKeys[index].length;
+ pagingValuesSize += Ints.BYTES;
+ index++;
+ }
+
+ final byte[] thresholdBytes = ByteBuffer.allocate(Ints.BYTES).putInt(threshold).array();
+
+ final ByteBuffer queryCacheKey = ByteBuffer.allocate(pagingKeysSize + pagingValuesSize + thresholdBytes.length);
+
+ for (byte[] pagingKey : pagingKeys) {
+ queryCacheKey.put(pagingKey);
+ }
+
+ for (byte[] pagingValue : pagingValues) {
+ queryCacheKey.put(pagingValue);
+ }
+
+ queryCacheKey.put(thresholdBytes);
+
+ return queryCacheKey.array();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "PagingSpec{" +
+ "pagingIdentifiers=" + pagingIdentifiers +
+ ", threshold=" + threshold +
+ '}';
+ }
+}
diff --git a/processing/src/main/java/io/druid/query/select/SelectBinaryFn.java b/processing/src/main/java/io/druid/query/select/SelectBinaryFn.java
new file mode 100644
index 00000000000..7de46c26b6a
--- /dev/null
+++ b/processing/src/main/java/io/druid/query/select/SelectBinaryFn.java
@@ -0,0 +1,77 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.query.select;
+
+import com.metamx.common.guava.nary.BinaryFn;
+import io.druid.granularity.AllGranularity;
+import io.druid.granularity.QueryGranularity;
+import io.druid.query.Result;
+import org.joda.time.DateTime;
+
+/**
+ */
+public class SelectBinaryFn
+ implements BinaryFn, Result, Result>
+{
+ private final QueryGranularity gran;
+ private final PagingSpec pagingSpec;
+
+ public SelectBinaryFn(
+ QueryGranularity granularity,
+ PagingSpec pagingSpec
+ )
+ {
+ this.gran = granularity;
+ this.pagingSpec = pagingSpec;
+ }
+
+ @Override
+ public Result apply(
+ Result arg1, Result arg2
+ )
+ {
+ if (arg1 == null) {
+ return arg2;
+ }
+
+ if (arg2 == null) {
+ return arg1;
+ }
+
+ final DateTime timestamp = (gran instanceof AllGranularity)
+ ? arg1.getTimestamp()
+ : gran.toDateTime(gran.truncate(arg1.getTimestamp().getMillis()));
+
+ SelectResultValueBuilder builder = new SelectResultValueBuilder(timestamp, pagingSpec.getThreshold());
+
+ SelectResultValue arg1Val = arg1.getValue();
+ SelectResultValue arg2Val = arg2.getValue();
+
+ for (EventHolder event : arg1Val) {
+ builder.addEntry(event);
+ }
+
+ for (EventHolder event : arg2Val) {
+ builder.addEntry(event);
+ }
+
+ return builder.build();
+ }
+}
diff --git a/processing/src/main/java/io/druid/query/select/SelectQuery.java b/processing/src/main/java/io/druid/query/select/SelectQuery.java
new file mode 100644
index 00000000000..8c5eb2ba59f
--- /dev/null
+++ b/processing/src/main/java/io/druid/query/select/SelectQuery.java
@@ -0,0 +1,149 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.query.select;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.druid.granularity.QueryGranularity;
+import io.druid.query.BaseQuery;
+import io.druid.query.Query;
+import io.druid.query.Result;
+import io.druid.query.filter.DimFilter;
+import io.druid.query.spec.QuerySegmentSpec;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ */
+@JsonTypeName("select")
+public class SelectQuery extends BaseQuery>
+{
+ private final DimFilter dimFilter;
+ private final QueryGranularity granularity;
+ private final List dimensions;
+ private final List metrics;
+ private final PagingSpec pagingSpec;
+
+ @JsonCreator
+ public SelectQuery(
+ @JsonProperty("dataSource") String dataSource,
+ @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
+ @JsonProperty("filter") DimFilter dimFilter,
+ @JsonProperty("granularity") QueryGranularity granularity,
+ @JsonProperty("dimensions") List dimensions,
+ @JsonProperty("metrics") List metrics,
+ @JsonProperty("pagingSpec") PagingSpec pagingSpec,
+ @JsonProperty("context") Map context
+ )
+ {
+ super(dataSource, querySegmentSpec, context);
+ this.dimFilter = dimFilter;
+ this.granularity = granularity;
+ this.dimensions = dimensions;
+ this.metrics = metrics;
+ this.pagingSpec = pagingSpec;
+ }
+
+ @Override
+ public boolean hasFilters()
+ {
+ return dimFilter != null;
+ }
+
+ @Override
+ public String getType()
+ {
+ return Query.SELECT;
+ }
+
+ @JsonProperty("filter")
+ public DimFilter getDimensionsFilter()
+ {
+ return dimFilter;
+ }
+
+ @JsonProperty
+ public QueryGranularity getGranularity()
+ {
+ return granularity;
+ }
+
+ @JsonProperty
+ public List getDimensions()
+ {
+ return dimensions;
+ }
+
+ @JsonProperty
+ public PagingSpec getPagingSpec()
+ {
+ return pagingSpec;
+ }
+
+ @JsonProperty
+ public List getMetrics()
+ {
+ return metrics;
+ }
+
+ public SelectQuery withQuerySegmentSpec(QuerySegmentSpec querySegmentSpec)
+ {
+ return new SelectQuery(
+ getDataSource(),
+ querySegmentSpec,
+ dimFilter,
+ granularity,
+ dimensions,
+ metrics,
+ pagingSpec,
+ getContext()
+ );
+ }
+
+ public SelectQuery withOverriddenContext(Map contextOverrides)
+ {
+ return new SelectQuery(
+ getDataSource(),
+ getQuerySegmentSpec(),
+ dimFilter,
+ granularity,
+ dimensions,
+ metrics,
+ pagingSpec,
+ computeOverridenContext(contextOverrides)
+ );
+ }
+
+ @Override
+ public String toString()
+ {
+ return "SelectQuery{" +
+ "dataSource='" + getDataSource() + '\'' +
+ ", querySegmentSpec=" + getQuerySegmentSpec() +
+ ", dimFilter=" + dimFilter +
+ ", granularity=" + granularity +
+ ", dimensions=" + dimensions +
+ ", metrics=" + metrics +
+ ", pagingSpec=" + pagingSpec +
+ '}';
+ }
+}
diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java b/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java
new file mode 100644
index 00000000000..3238ac01f7a
--- /dev/null
+++ b/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java
@@ -0,0 +1,167 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.query.select;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.metamx.common.guava.BaseSequence;
+import com.metamx.common.guava.Sequence;
+import io.druid.query.QueryRunnerHelper;
+import io.druid.query.Result;
+import io.druid.segment.Cursor;
+import io.druid.segment.DimensionSelector;
+import io.druid.segment.ObjectColumnSelector;
+import io.druid.segment.Segment;
+import io.druid.segment.StorageAdapter;
+import io.druid.segment.TimestampColumnSelector;
+import io.druid.segment.data.IndexedInts;
+import io.druid.segment.filter.Filters;
+import org.joda.time.DateTime;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ */
+public class SelectQueryEngine
+{
+ public Sequence> process(final SelectQuery query, final Segment segment)
+ {
+ return new BaseSequence<>(
+ new BaseSequence.IteratorMaker, Iterator>>()
+ {
+ @Override
+ public Iterator> make()
+ {
+ final StorageAdapter adapter = segment.asStorageAdapter();
+
+ final Iterable dims;
+ if (query.getDimensions() == null || query.getDimensions().isEmpty()) {
+ dims = adapter.getAvailableDimensions();
+ } else {
+ dims = query.getDimensions();
+ }
+
+ final Iterable metrics;
+ if (query.getMetrics() == null || query.getMetrics().isEmpty()) {
+ metrics = adapter.getAvailableMetrics();
+ } else {
+ metrics = query.getMetrics();
+ }
+
+ return QueryRunnerHelper.makeCursorBasedQuery(
+ adapter,
+ query.getQuerySegmentSpec().getIntervals(),
+ Filters.convertDimensionFilters(query.getDimensionsFilter()),
+ query.getGranularity(),
+ new Function>()
+ {
+ @Override
+ public Result apply(Cursor cursor)
+ {
+ final SelectResultValueBuilder builder = new SelectResultValueBuilder(
+ cursor.getTime(),
+ query.getPagingSpec()
+ .getThreshold()
+ );
+
+ final TimestampColumnSelector timestampColumnSelector = cursor.makeTimestampColumnSelector();
+
+ final Map dimSelectors = Maps.newHashMap();
+ for (String dim : dims) {
+ final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim);
+ dimSelectors.put(dim, dimSelector);
+ }
+
+ final Map metSelectors = Maps.newHashMap();
+ for (String metric : metrics) {
+ final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric);
+ metSelectors.put(metric, metricSelector);
+ }
+
+ int startOffset;
+ if (query.getPagingSpec().getPagingIdentifiers() == null) {
+ startOffset = 0;
+ } else {
+ Integer offset = query.getPagingSpec().getPagingIdentifiers().get(segment.getIdentifier());
+ startOffset = (offset == null) ? 0 : offset;
+ }
+
+ cursor.advanceTo(startOffset);
+
+ int offset = 0;
+ while (!cursor.isDone() && offset < query.getPagingSpec().getThreshold()) {
+ final Map theEvent = Maps.newLinkedHashMap();
+ theEvent.put(EventHolder.timestampKey, new DateTime(timestampColumnSelector.getTimestamp()));
+
+ for (Map.Entry dimSelector : dimSelectors.entrySet()) {
+ final String dim = dimSelector.getKey();
+ final DimensionSelector selector = dimSelector.getValue();
+ final IndexedInts vals = selector.getRow();
+
+ if (vals.size() == 1) {
+ final String dimVal = selector.lookupName(vals.get(0));
+ theEvent.put(dim, dimVal);
+ } else {
+ List dimVals = Lists.newArrayList();
+ for (int i = 0; i < vals.size(); ++i) {
+ dimVals.add(selector.lookupName(vals.get(i)));
+ }
+ theEvent.put(dim, dimVals);
+ }
+ }
+
+ for (Map.Entry metSelector : metSelectors.entrySet()) {
+ final String metric = metSelector.getKey();
+ final ObjectColumnSelector selector = metSelector.getValue();
+ theEvent.put(metric, selector.get());
+ }
+
+ builder.addEntry(
+ new EventHolder(
+ segment.getIdentifier(),
+ startOffset + offset,
+ theEvent
+ )
+ );
+ cursor.advance();
+ offset++;
+ }
+
+ return builder.build();
+ }
+ }
+ ).iterator();
+ }
+
+ @Override
+ public void cleanup(Iterator> toClean)
+ {
+ // https://github.com/metamx/druid/issues/128
+ while (toClean.hasNext()) {
+ toClean.next();
+ }
+ }
+ }
+ );
+ }
+}
diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java
new file mode 100644
index 00000000000..9e5a365479c
--- /dev/null
+++ b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java
@@ -0,0 +1,291 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.query.select;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
+import com.google.inject.Inject;
+import com.metamx.common.guava.MergeSequence;
+import com.metamx.common.guava.Sequence;
+import com.metamx.common.guava.nary.BinaryFn;
+import com.metamx.emitter.service.ServiceMetricEvent;
+import io.druid.collections.OrderedMergeSequence;
+import io.druid.granularity.QueryGranularity;
+import io.druid.query.CacheStrategy;
+import io.druid.query.IntervalChunkingQueryRunner;
+import io.druid.query.Query;
+import io.druid.query.QueryConfig;
+import io.druid.query.QueryRunner;
+import io.druid.query.QueryToolChest;
+import io.druid.query.Result;
+import io.druid.query.ResultGranularTimestampComparator;
+import io.druid.query.ResultMergeQueryRunner;
+import io.druid.query.aggregation.MetricManipulationFn;
+import io.druid.query.filter.DimFilter;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Minutes;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ */
+public class SelectQueryQueryToolChest extends QueryToolChest, SelectQuery>
+{
+ private static final byte SELECT_QUERY = 0x13;
+
+ private static final Joiner COMMA_JOIN = Joiner.on(",");
+ private static final TypeReference