diff --git a/build.sh b/build.sh
index 727a15d954d..a5b6f7e1366 100755
--- a/build.sh
+++ b/build.sh
@@ -30,4 +30,4 @@ echo "For examples, see: "
echo " "
ls -1 examples/*/*sh
echo " "
-echo "See also http://druid.io/docs/0.6.61"
+echo "See also http://druid.io/docs/0.6.65"
diff --git a/cassandra-storage/pom.xml b/cassandra-storage/pom.xml
index 38ccdc30476..4858c321bad 100644
--- a/cassandra-storage/pom.xml
+++ b/cassandra-storage/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.63-SNAPSHOT
+ 0.6.66-SNAPSHOT
diff --git a/common/pom.xml b/common/pom.xml
index fd51a1837d0..51c19714242 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.63-SNAPSHOT
+ 0.6.66-SNAPSHOT
diff --git a/common/src/main/java/io/druid/concurrent/Execs.java b/common/src/main/java/io/druid/concurrent/Execs.java
index 308208ef98d..66af2a196ba 100644
--- a/common/src/main/java/io/druid/concurrent/Execs.java
+++ b/common/src/main/java/io/druid/concurrent/Execs.java
@@ -22,11 +22,13 @@ package io.druid.concurrent;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -60,25 +62,29 @@ public class Execs
* @param capacity maximum capacity after which the executorService will block on accepting new tasks
* @return ExecutorService which blocks accepting new tasks when the capacity reached
*/
- public static ExecutorService newBlockingSingleThreaded(String nameFormat, int capacity)
+ public static ExecutorService newBlockingSingleThreaded(final String nameFormat, final int capacity)
{
- return new ThreadPoolExecutor(
- 1, 1,
- 0L, TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue(capacity), makeThreadFactory(nameFormat)
- , new RejectedExecutionHandler()
- {
- @Override
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
- {
- try {
- ((ArrayBlockingQueue) executor.getQueue()).put(r);
- }
- catch (InterruptedException e) {
- throw new RejectedExecutionException("Got Interrupted while adding to the Queue");
- }
- }
+ final BlockingQueue queue;
+ if (capacity > 0) {
+ queue = new ArrayBlockingQueue<>(capacity);
+ } else {
+ queue = new SynchronousQueue<>();
}
+ return new ThreadPoolExecutor(
+ 1, 1, 0L, TimeUnit.MILLISECONDS, queue, makeThreadFactory(nameFormat),
+ new RejectedExecutionHandler()
+ {
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
+ {
+ try {
+ executor.getQueue().put(r);
+ }
+ catch (InterruptedException e) {
+ throw new RejectedExecutionException("Got Interrupted while adding to the Queue");
+ }
+ }
+ }
);
}
}
diff --git a/common/src/test/java/io/druid/concurrent/ExecsTest.java b/common/src/test/java/io/druid/concurrent/ExecsTest.java
index 809ed5eac02..ae2b0e15473 100644
--- a/common/src/test/java/io/druid/concurrent/ExecsTest.java
+++ b/common/src/test/java/io/druid/concurrent/ExecsTest.java
@@ -20,6 +20,8 @@
package io.druid.concurrent;
import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.metamx.common.logger.Logger;
import org.junit.Assert;
import org.junit.Test;
@@ -30,23 +32,46 @@ import java.util.concurrent.atomic.AtomicInteger;
public class ExecsTest
{
+ private static final Logger log = new Logger(ExecsTest.class);
+
@Test
- public void testBlockingExecutorService() throws Exception
+ public void testBlockingExecutorServiceZeroCapacity() throws Exception
{
- final int capacity = 3;
- final ExecutorService blockingExecutor = Execs.newBlockingSingleThreaded("test%d", capacity);
- final CountDownLatch queueFullSignal = new CountDownLatch(capacity + 1);
- final CountDownLatch taskCompletedSignal = new CountDownLatch(2 * capacity);
+ runTest(0);
+ }
+
+ @Test
+ public void testBlockingExecutorServiceOneCapacity() throws Exception
+ {
+ runTest(1);
+ }
+
+ @Test
+ public void testBlockingExecutorServiceThreeCapacity() throws Exception
+ {
+ runTest(3);
+ }
+
+ private static void runTest(final int capacity) throws Exception
+ {
+ final int nTasks = (capacity + 1) * 3;
+ final ExecutorService blockingExecutor = Execs.newBlockingSingleThreaded("ExecsTest-Blocking-%d", capacity);
+ final CountDownLatch queueShouldBeFullSignal = new CountDownLatch(capacity + 1);
+ final CountDownLatch taskCompletedSignal = new CountDownLatch(nTasks);
final CountDownLatch taskStartSignal = new CountDownLatch(1);
final AtomicInteger producedCount = new AtomicInteger();
final AtomicInteger consumedCount = new AtomicInteger();
- ExecutorService producer = Executors.newSingleThreadExecutor();
+ final ExecutorService producer = Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder().setNameFormat(
+ "ExecsTest-Producer-%d"
+ ).build()
+ );
producer.submit(
new Runnable()
{
public void run()
{
- for (int i = 0; i < 2 * capacity; i++) {
+ for (int i = 0; i < nTasks; i++) {
final int taskID = i;
System.out.println("Produced task" + taskID);
blockingExecutor.submit(
@@ -55,7 +80,7 @@ public class ExecsTest
@Override
public void run()
{
- System.out.println("Starting task" + taskID);
+ log.info("Starting task: %s", taskID);
try {
taskStartSignal.await();
consumedCount.incrementAndGet();
@@ -64,29 +89,31 @@ public class ExecsTest
catch (Exception e) {
throw Throwables.propagate(e);
}
- System.out.println("Completed task" + taskID);
+ log.info("Completed task: %s", taskID);
}
}
);
producedCount.incrementAndGet();
- queueFullSignal.countDown();
+ queueShouldBeFullSignal.countDown();
}
}
}
);
- queueFullSignal.await();
- // verify that the producer blocks
+ queueShouldBeFullSignal.await();
+ // Verify that the producer blocks. I don't think it's possible to be sure that the producer is blocking (since
+ // it could be doing nothing for any reason). But waiting a short period of time and checking that it hasn't done
+ // anything should hopefully be sufficient.
+ Thread.sleep(500);
Assert.assertEquals(capacity + 1, producedCount.get());
// let the tasks run
taskStartSignal.countDown();
// wait until all tasks complete
taskCompletedSignal.await();
// verify all tasks consumed
- Assert.assertEquals(2 * capacity, consumedCount.get());
+ Assert.assertEquals(nTasks, consumedCount.get());
// cleanup
blockingExecutor.shutdown();
producer.shutdown();
-
}
}
diff --git a/docs/content/Aggregations.md b/docs/content/Aggregations.md
index 93bfb76c90e..74ad226ff81 100644
--- a/docs/content/Aggregations.md
+++ b/docs/content/Aggregations.md
@@ -82,3 +82,13 @@ All JavaScript functions must return numerical values.
"fnReset" : "function() { return 10; }"
}
```
+
+### Complex aggregators
+
+#### `hyperUnique` aggregator
+
+`hyperUnique` uses [Hyperloglog](http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf) to compute the estimated cardinality of a dimension.
+
+```json
+{ "type" : "hyperUnique", "name" : , "fieldName" : }
+```
diff --git a/docs/content/DataSource.md b/docs/content/DataSource.md
new file mode 100644
index 00000000000..49c583561ba
--- /dev/null
+++ b/docs/content/DataSource.md
@@ -0,0 +1,25 @@
+---
+layout: doc_page
+---
+A data source is the Druid equivalent of a database table. However, a query can also masquerade as a data source, providing subquery-like functionality. Query data sources are currently only supported by [GroupBy](GroupByQuery.html) queries.
+
+### Table Data Source
+The table data source the most common type. It's represented by a string, or by the full structure:
+
+```json
+{
+ "type": "table",
+ "name":
+}
+```
+
+### Query Data Source
+```json
+{
+ "type": "query",
+ "query": {
+ "type": "groupBy",
+ ...
+ }
+}
+```
diff --git a/docs/content/Examples.md b/docs/content/Examples.md
index 495891f6da9..6c2589b2160 100644
--- a/docs/content/Examples.md
+++ b/docs/content/Examples.md
@@ -19,13 +19,13 @@ Clone Druid and build it:
git clone https://github.com/metamx/druid.git druid
cd druid
git fetch --tags
-git checkout druid-0.6.61
+git checkout druid-0.6.65
./build.sh
```
### Downloading the DSK (Druid Standalone Kit)
-[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.61-bin.tar.gz) a stand-alone tarball and run it:
+[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.65-bin.tar.gz) a stand-alone tarball and run it:
``` bash
tar -xzf druid-services-0.X.X-bin.tar.gz
diff --git a/docs/content/GroupByQuery.md b/docs/content/GroupByQuery.md
index 9edca5d2861..ca6ef8e277a 100644
--- a/docs/content/GroupByQuery.md
+++ b/docs/content/GroupByQuery.md
@@ -48,7 +48,7 @@ There are 9 main parts to a groupBy query:
|property|description|required?|
|--------|-----------|---------|
|queryType|This String should always be "groupBy"; this is the first thing Druid looks at to figure out how to interpret the query|yes|
-|dataSource|A String defining the data source to query, very similar to a table in a relational database|yes|
+|dataSource|A String defining the data source to query, very similar to a table in a relational database, or a [DataSource](DataSource.html) structure.|yes|
|dimensions|A JSON list of dimensions to do the groupBy over|yes|
|orderBy|See [OrderBy](OrderBy.html).|no|
|having|See [Having](Having.html).|no|
diff --git a/docs/content/Indexing-Service-Config.md b/docs/content/Indexing-Service-Config.md
index 88f89220d21..79c5462584b 100644
--- a/docs/content/Indexing-Service-Config.md
+++ b/docs/content/Indexing-Service-Config.md
@@ -66,7 +66,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/indexer
-druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.61"]
+druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.65"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod
@@ -115,7 +115,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/worker
-druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.61","io.druid.extensions:druid-kafka-seven:0.6.61"]
+druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.65","io.druid.extensions:druid-kafka-seven:0.6.65"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod
diff --git a/docs/content/Post-aggregations.md b/docs/content/Post-aggregations.md
index 527d64e7971..4dce46ceff1 100644
--- a/docs/content/Post-aggregations.md
+++ b/docs/content/Post-aggregations.md
@@ -64,6 +64,31 @@ Example JavaScript aggregator:
"function": "function(delta, total) { return 100 * Math.abs(delta) / total; }"
}
```
+### `hyperUniqueCardinality` post-aggregator
+
+The hyperUniqueCardinality post aggregator is used to wrap a hyperUnique object such that it can be used in post aggregations.
+
+```json
+{ "type" : "hyperUniqueCardinality", "fieldName" : }
+```
+
+It can be used in a sample calculation as so:
+
+```json
+ "aggregations" : [{
+ {"type" : "count", "name" : "rows"},
+ {"type" : "hyperUnique", "name" : "unique_users", "fieldName" : "uniques"}
+ }],
+ "postAggregations" : {
+ "type" : "arithmetic",
+ "name" : "average_users_per_row",
+ "fn" : "/",
+ "fields" : [
+ { "type" : "hyperUniqueCardinality", "fieldName" : "unique_users" },
+ { "type" : "fieldAccess", "name" : "rows", "fieldName" : "rows" }
+ ]
+ }
+```
### Example Usage
@@ -98,4 +123,4 @@ The format of the query JSON is as follows:
...
}
-```
+```
\ No newline at end of file
diff --git a/docs/content/Realtime-Config.md b/docs/content/Realtime-Config.md
index 551d6704ae8..89cdc1dd630 100644
--- a/docs/content/Realtime-Config.md
+++ b/docs/content/Realtime-Config.md
@@ -27,7 +27,7 @@ druid.host=localhost
druid.service=realtime
druid.port=8083
-druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.61"]
+druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.65"]
druid.zk.service.host=localhost
@@ -76,7 +76,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/realtime
-druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.61","io.druid.extensions:druid-kafka-seven:0.6.61"]
+druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.65","io.druid.extensions:druid-kafka-seven:0.6.65"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod
diff --git a/docs/content/Tutorial:-A-First-Look-at-Druid.md b/docs/content/Tutorial:-A-First-Look-at-Druid.md
index cbf04e9a708..8ea276eb0d6 100644
--- a/docs/content/Tutorial:-A-First-Look-at-Druid.md
+++ b/docs/content/Tutorial:-A-First-Look-at-Druid.md
@@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
### Download a Tarball
-We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.61-bin.tar.gz). Download this file to a directory of your choosing.
+We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.65-bin.tar.gz). Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
-cd druid-services-0.6.61
+cd druid-services-0.6.65
```
You should see a bunch of files:
diff --git a/docs/content/Tutorial:-The-Druid-Cluster.md b/docs/content/Tutorial:-The-Druid-Cluster.md
index 958cd580752..e3a805bb19c 100644
--- a/docs/content/Tutorial:-The-Druid-Cluster.md
+++ b/docs/content/Tutorial:-The-Druid-Cluster.md
@@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes and external depende
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
-You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.61-bin.tar.gz)
+You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.65-bin.tar.gz)
and untar the contents within by issuing:
@@ -149,7 +149,7 @@ druid.port=8081
druid.zk.service.host=localhost
-druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.61"]
+druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.65"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
@@ -240,7 +240,7 @@ druid.port=8083
druid.zk.service.host=localhost
-druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.61","io.druid.extensions:druid-kafka-seven:0.6.61"]
+druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.65","io.druid.extensions:druid-kafka-seven:0.6.65"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop
diff --git a/docs/content/Tutorial:-Webstream.md b/docs/content/Tutorial:-Webstream.md
index 481b2b897b6..e37c1d4f677 100644
--- a/docs/content/Tutorial:-Webstream.md
+++ b/docs/content/Tutorial:-Webstream.md
@@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
h3. Download a Tarball
-We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.61-bin.tar.gz)
+We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.65-bin.tar.gz)
Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
-cd druid-services-0.6.61
+cd druid-services-0.6.65
```
You should see a bunch of files:
diff --git a/docs/content/Twitter-Tutorial.textile b/docs/content/Twitter-Tutorial.textile
index 9652bdb2355..a8db1f8ed5d 100644
--- a/docs/content/Twitter-Tutorial.textile
+++ b/docs/content/Twitter-Tutorial.textile
@@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source.
h3. Download a Tarball
-We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.61-bin.tar.gz.
+We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.65-bin.tar.gz.
Download this bad boy to a directory of your choosing.
You can extract the awesomeness within by issuing:
diff --git a/examples/bin/run_example_server.sh b/examples/bin/run_example_server.sh
index f1adada0bfa..3d63e7cb0c3 100755
--- a/examples/bin/run_example_server.sh
+++ b/examples/bin/run_example_server.sh
@@ -55,8 +55,6 @@ JAVA_ARGS="${JAVA_ARGS} -Ddruid.realtime.specFile=${SPEC_FILE}"
DRUID_CP=${EXAMPLE_LOC}
#For a pull
-DRUID_CP=${DRUID_CP}:`ls ${SCRIPT_DIR}/../target/druid-examples-*-selfcontained.jar`
-DRUID_CP=${DRUID_CP}:`ls ${SCRIPT_DIR}/../../services/target/druid-services-*-selfcontained.jar`
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/../config/realtime
#For the kit
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/lib/*
diff --git a/examples/config/historical/runtime.properties b/examples/config/historical/runtime.properties
index a94ec460fb3..8632e8813be 100644
--- a/examples/config/historical/runtime.properties
+++ b/examples/config/historical/runtime.properties
@@ -4,7 +4,7 @@ druid.port=8081
druid.zk.service.host=localhost
-druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.61"]
+druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.65"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
diff --git a/examples/config/realtime/runtime.properties b/examples/config/realtime/runtime.properties
index 45471c989ac..10e5f85c97e 100644
--- a/examples/config/realtime/runtime.properties
+++ b/examples/config/realtime/runtime.properties
@@ -4,7 +4,7 @@ druid.port=8083
druid.zk.service.host=localhost
-druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.61","io.druid.extensions:druid-kafka-seven:0.6.61","io.druid.extensions:druid-rabbitmq:0.6.61"]
+druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.65","io.druid.extensions:druid-kafka-seven:0.6.65","io.druid.extensions:druid-rabbitmq:0.6.65"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop
diff --git a/examples/pom.xml b/examples/pom.xml
index bce8ed35efd..69080729f79 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.63-SNAPSHOT
+ 0.6.66-SNAPSHOT
diff --git a/hdfs-storage/pom.xml b/hdfs-storage/pom.xml
index a6990497dd8..e65075d3c9d 100644
--- a/hdfs-storage/pom.xml
+++ b/hdfs-storage/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.63-SNAPSHOT
+ 0.6.66-SNAPSHOT
diff --git a/hll/pom.xml b/hll/pom.xml
index 09e6fba02e6..0531ab90758 100644
--- a/hll/pom.xml
+++ b/hll/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.63-SNAPSHOT
+ 0.6.66-SNAPSHOT
diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml
index e164043244e..494ba1aa354 100644
--- a/indexing-hadoop/pom.xml
+++ b/indexing-hadoop/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.63-SNAPSHOT
+ 0.6.66-SNAPSHOT
@@ -97,11 +97,6 @@
junit
test
-
- com.clearspring.analytics
- stream
- 2.5.2
-
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java
index 407cf84dec3..ae2d61a9a93 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java
@@ -19,8 +19,6 @@
package io.druid.indexer;
-import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
-import com.clearspring.analytics.stream.cardinality.HyperLogLog;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
@@ -36,6 +34,7 @@ import io.druid.data.input.InputRow;
import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity;
import io.druid.indexer.granularity.UniformGranularitySpec;
+import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.NoneShardSpec;
import org.apache.hadoop.conf.Configuration;
@@ -56,6 +55,7 @@ import org.joda.time.Interval;
import java.io.IOException;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -67,7 +67,6 @@ public class DetermineHashedPartitionsJob implements Jobby
{
private static final int MAX_SHARDS = 128;
private static final Logger log = new Logger(DetermineHashedPartitionsJob.class);
- private static final int HYPER_LOG_LOG_BIT_SIZE = 20;
private final HadoopDruidIndexerConfig config;
public DetermineHashedPartitionsJob(
@@ -99,8 +98,8 @@ public class DetermineHashedPartitionsJob implements Jobby
groupByJob.setOutputKeyClass(NullWritable.class);
groupByJob.setOutputValueClass(NullWritable.class);
groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class);
- if(!config.getSegmentGranularIntervals().isPresent()){
- groupByJob.setNumReduceTasks(1);
+ if (!config.getSegmentGranularIntervals().isPresent()) {
+ groupByJob.setNumReduceTasks(1);
}
JobHelper.setupClasspath(config, groupByJob);
@@ -194,7 +193,7 @@ public class DetermineHashedPartitionsJob implements Jobby
{
private static HashFunction hashFunction = Hashing.murmur3_128();
private QueryGranularity rollupGranularity = null;
- private Map hyperLogLogs;
+ private Map hyperLogLogs;
private HadoopDruidIndexerConfig config;
private boolean determineIntervals;
@@ -208,9 +207,9 @@ public class DetermineHashedPartitionsJob implements Jobby
Optional> intervals = config.getSegmentGranularIntervals();
if (intervals.isPresent()) {
determineIntervals = false;
- final ImmutableMap.Builder builder = ImmutableMap.builder();
+ final ImmutableMap.Builder builder = ImmutableMap.builder();
for (final Interval bucketInterval : intervals.get()) {
- builder.put(bucketInterval, new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE));
+ builder.put(bucketInterval, HyperLogLogCollector.makeLatestCollector());
}
hyperLogLogs = builder.build();
} else {
@@ -236,7 +235,7 @@ public class DetermineHashedPartitionsJob implements Jobby
interval = config.getGranularitySpec().getGranularity().bucket(new DateTime(inputRow.getTimestampFromEpoch()));
if (!hyperLogLogs.containsKey(interval)) {
- hyperLogLogs.put(interval, new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE));
+ hyperLogLogs.put(interval, HyperLogLogCollector.makeLatestCollector());
}
} else {
final Optional maybeInterval = config.getGranularitySpec()
@@ -248,9 +247,9 @@ public class DetermineHashedPartitionsJob implements Jobby
interval = maybeInterval.get();
}
hyperLogLogs.get(interval)
- .offerHashed(
+ .add(
hashFunction.hashBytes(HadoopDruidIndexerConfig.jsonMapper.writeValueAsBytes(groupKey))
- .asLong()
+ .asBytes()
);
}
@@ -263,10 +262,10 @@ public class DetermineHashedPartitionsJob implements Jobby
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
- for (Map.Entry entry : hyperLogLogs.entrySet()) {
+ for (Map.Entry entry : hyperLogLogs.entrySet()) {
context.write(
new LongWritable(entry.getKey().getStartMillis()),
- new BytesWritable(entry.getValue().getBytes())
+ new BytesWritable(entry.getValue().toByteArray())
);
}
cleanup(context);
@@ -294,15 +293,9 @@ public class DetermineHashedPartitionsJob implements Jobby
Context context
) throws IOException, InterruptedException
{
- HyperLogLog aggregate = new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE);
+ HyperLogLogCollector aggregate = HyperLogLogCollector.makeLatestCollector();
for (BytesWritable value : values) {
- HyperLogLog logValue = HyperLogLog.Builder.build(getDataBytes(value));
- try {
- aggregate.addAll(logValue);
- }
- catch (CardinalityMergeException e) {
- e.printStackTrace(); // TODO: check for better handling
- }
+ aggregate.fold(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()));
}
Interval interval = config.getGranularitySpec().getGranularity().bucket(new DateTime(key.get()));
intervals.add(interval);
@@ -318,7 +311,7 @@ public class DetermineHashedPartitionsJob implements Jobby
}
).writeValue(
out,
- aggregate.cardinality()
+ new Double(aggregate.estimateCardinality()).longValue()
);
}
finally {
diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index 4b3b68fb4d0..e6d7ea63442 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.63-SNAPSHOT
+ 0.6.66-SNAPSHOT
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
index 30d0750f3d0..878f950f0c4 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
@@ -281,7 +281,7 @@ public class HadoopIndexTask extends AbstractTask
Jobby job = new HadoopDruidDetermineConfigurationJob(config);
- log.info("Starting a hadoop index generator job...");
+ log.info("Starting a hadoop determine configuration job...");
if (job.run()) {
return HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(HadoopDruidIndexerConfigBuilder.toSchema(config));
}
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
index 7a40035c3e6..09172da3a4d 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
@@ -109,7 +109,7 @@ public class RealtimeIndexTask extends AbstractTask
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig,
@JsonProperty("windowPeriod") Period windowPeriod,
- @JsonProperty("maxPendingPersists") int maxPendingPersists,
+ @JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity,
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory
)
@@ -139,7 +139,7 @@ public class RealtimeIndexTask extends AbstractTask
this.firehoseFactory = firehoseFactory;
this.fireDepartmentConfig = fireDepartmentConfig;
this.windowPeriod = windowPeriod;
- this.maxPendingPersists = (maxPendingPersists == 0)
+ this.maxPendingPersists = (maxPendingPersists == null)
? RealtimePlumberSchool.DEFAULT_MAX_PENDING_PERSISTS
: maxPendingPersists;
this.segmentGranularity = segmentGranularity;
@@ -398,6 +398,12 @@ public class RealtimeIndexTask extends AbstractTask
return windowPeriod;
}
+ @JsonProperty
+ public int getMaxPendingPersists()
+ {
+ return maxPendingPersists;
+ }
+
@JsonProperty
public IndexGranularity getSegmentGranularity()
{
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java b/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java
index dc02ee9d4ef..2afa4ed0dd5 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java
@@ -90,7 +90,7 @@ public class IndexerDBCoordinator
final ResultIterator