diff --git a/build.sh b/build.sh
index 727a15d954d..ebd71744026 100755
--- a/build.sh
+++ b/build.sh
@@ -30,4 +30,4 @@ echo "For examples, see: "
echo " "
ls -1 examples/*/*sh
echo " "
-echo "See also http://druid.io/docs/0.6.61"
+echo "See also http://druid.io/docs/0.6.69"
diff --git a/cassandra-storage/pom.xml b/cassandra-storage/pom.xml
index 38ccdc30476..aa76a7de68e 100644
--- a/cassandra-storage/pom.xml
+++ b/cassandra-storage/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.63-SNAPSHOT
+ 0.6.70-SNAPSHOT
diff --git a/common/pom.xml b/common/pom.xml
index fd51a1837d0..cc13d2a7d76 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.63-SNAPSHOT
+ 0.6.70-SNAPSHOT
diff --git a/common/src/main/java/io/druid/concurrent/Execs.java b/common/src/main/java/io/druid/concurrent/Execs.java
index 308208ef98d..66af2a196ba 100644
--- a/common/src/main/java/io/druid/concurrent/Execs.java
+++ b/common/src/main/java/io/druid/concurrent/Execs.java
@@ -22,11 +22,13 @@ package io.druid.concurrent;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -60,25 +62,29 @@ public class Execs
* @param capacity maximum capacity after which the executorService will block on accepting new tasks
* @return ExecutorService which blocks accepting new tasks when the capacity reached
*/
- public static ExecutorService newBlockingSingleThreaded(String nameFormat, int capacity)
+ public static ExecutorService newBlockingSingleThreaded(final String nameFormat, final int capacity)
{
- return new ThreadPoolExecutor(
- 1, 1,
- 0L, TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue(capacity), makeThreadFactory(nameFormat)
- , new RejectedExecutionHandler()
- {
- @Override
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
- {
- try {
- ((ArrayBlockingQueue) executor.getQueue()).put(r);
- }
- catch (InterruptedException e) {
- throw new RejectedExecutionException("Got Interrupted while adding to the Queue");
- }
- }
+ final BlockingQueue queue;
+ if (capacity > 0) {
+ queue = new ArrayBlockingQueue<>(capacity);
+ } else {
+ queue = new SynchronousQueue<>();
}
+ return new ThreadPoolExecutor(
+ 1, 1, 0L, TimeUnit.MILLISECONDS, queue, makeThreadFactory(nameFormat),
+ new RejectedExecutionHandler()
+ {
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
+ {
+ try {
+ executor.getQueue().put(r);
+ }
+ catch (InterruptedException e) {
+ throw new RejectedExecutionException("Got Interrupted while adding to the Queue");
+ }
+ }
+ }
);
}
}
diff --git a/common/src/test/java/io/druid/concurrent/ExecsTest.java b/common/src/test/java/io/druid/concurrent/ExecsTest.java
index 809ed5eac02..ae2b0e15473 100644
--- a/common/src/test/java/io/druid/concurrent/ExecsTest.java
+++ b/common/src/test/java/io/druid/concurrent/ExecsTest.java
@@ -20,6 +20,8 @@
package io.druid.concurrent;
import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.metamx.common.logger.Logger;
import org.junit.Assert;
import org.junit.Test;
@@ -30,23 +32,46 @@ import java.util.concurrent.atomic.AtomicInteger;
public class ExecsTest
{
+ private static final Logger log = new Logger(ExecsTest.class);
+
@Test
- public void testBlockingExecutorService() throws Exception
+ public void testBlockingExecutorServiceZeroCapacity() throws Exception
{
- final int capacity = 3;
- final ExecutorService blockingExecutor = Execs.newBlockingSingleThreaded("test%d", capacity);
- final CountDownLatch queueFullSignal = new CountDownLatch(capacity + 1);
- final CountDownLatch taskCompletedSignal = new CountDownLatch(2 * capacity);
+ runTest(0);
+ }
+
+ @Test
+ public void testBlockingExecutorServiceOneCapacity() throws Exception
+ {
+ runTest(1);
+ }
+
+ @Test
+ public void testBlockingExecutorServiceThreeCapacity() throws Exception
+ {
+ runTest(3);
+ }
+
+ private static void runTest(final int capacity) throws Exception
+ {
+ final int nTasks = (capacity + 1) * 3;
+ final ExecutorService blockingExecutor = Execs.newBlockingSingleThreaded("ExecsTest-Blocking-%d", capacity);
+ final CountDownLatch queueShouldBeFullSignal = new CountDownLatch(capacity + 1);
+ final CountDownLatch taskCompletedSignal = new CountDownLatch(nTasks);
final CountDownLatch taskStartSignal = new CountDownLatch(1);
final AtomicInteger producedCount = new AtomicInteger();
final AtomicInteger consumedCount = new AtomicInteger();
- ExecutorService producer = Executors.newSingleThreadExecutor();
+ final ExecutorService producer = Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder().setNameFormat(
+ "ExecsTest-Producer-%d"
+ ).build()
+ );
producer.submit(
new Runnable()
{
public void run()
{
- for (int i = 0; i < 2 * capacity; i++) {
+ for (int i = 0; i < nTasks; i++) {
final int taskID = i;
System.out.println("Produced task" + taskID);
blockingExecutor.submit(
@@ -55,7 +80,7 @@ public class ExecsTest
@Override
public void run()
{
- System.out.println("Starting task" + taskID);
+ log.info("Starting task: %s", taskID);
try {
taskStartSignal.await();
consumedCount.incrementAndGet();
@@ -64,29 +89,31 @@ public class ExecsTest
catch (Exception e) {
throw Throwables.propagate(e);
}
- System.out.println("Completed task" + taskID);
+ log.info("Completed task: %s", taskID);
}
}
);
producedCount.incrementAndGet();
- queueFullSignal.countDown();
+ queueShouldBeFullSignal.countDown();
}
}
}
);
- queueFullSignal.await();
- // verify that the producer blocks
+ queueShouldBeFullSignal.await();
+ // Verify that the producer blocks. I don't think it's possible to be sure that the producer is blocking (since
+ // it could be doing nothing for any reason). But waiting a short period of time and checking that it hasn't done
+ // anything should hopefully be sufficient.
+ Thread.sleep(500);
Assert.assertEquals(capacity + 1, producedCount.get());
// let the tasks run
taskStartSignal.countDown();
// wait until all tasks complete
taskCompletedSignal.await();
// verify all tasks consumed
- Assert.assertEquals(2 * capacity, consumedCount.get());
+ Assert.assertEquals(nTasks, consumedCount.get());
// cleanup
blockingExecutor.shutdown();
producer.shutdown();
-
}
}
diff --git a/docs/content/Aggregations.md b/docs/content/Aggregations.md
index 93bfb76c90e..74ad226ff81 100644
--- a/docs/content/Aggregations.md
+++ b/docs/content/Aggregations.md
@@ -82,3 +82,13 @@ All JavaScript functions must return numerical values.
"fnReset" : "function() { return 10; }"
}
```
+
+### Complex aggregators
+
+#### `hyperUnique` aggregator
+
+`hyperUnique` uses [Hyperloglog](http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf) to compute the estimated cardinality of a dimension.
+
+```json
+{ "type" : "hyperUnique", "name" : , "fieldName" : }
+```
diff --git a/docs/content/Broker-Config.md b/docs/content/Broker-Config.md
index 815173b4c36..7b49e25baf9 100644
--- a/docs/content/Broker-Config.md
+++ b/docs/content/Broker-Config.md
@@ -26,6 +26,11 @@ druid.service=broker
druid.port=8080
druid.zk.service.host=localhost
+
+# Change these to make Druid faster
+druid.processing.buffer.sizeBytes=100000000
+druid.processing.numThreads=1
+
```
Production Configs
diff --git a/docs/content/DataSource.md b/docs/content/DataSource.md
new file mode 100644
index 00000000000..49c583561ba
--- /dev/null
+++ b/docs/content/DataSource.md
@@ -0,0 +1,25 @@
+---
+layout: doc_page
+---
+A data source is the Druid equivalent of a database table. However, a query can also masquerade as a data source, providing subquery-like functionality. Query data sources are currently only supported by [GroupBy](GroupByQuery.html) queries.
+
+### Table Data Source
+The table data source the most common type. It's represented by a string, or by the full structure:
+
+```json
+{
+ "type": "table",
+ "name":
+}
+```
+
+### Query Data Source
+```json
+{
+ "type": "query",
+ "query": {
+ "type": "groupBy",
+ ...
+ }
+}
+```
diff --git a/docs/content/Examples.md b/docs/content/Examples.md
index 495891f6da9..4ce8a0d8f79 100644
--- a/docs/content/Examples.md
+++ b/docs/content/Examples.md
@@ -19,13 +19,13 @@ Clone Druid and build it:
git clone https://github.com/metamx/druid.git druid
cd druid
git fetch --tags
-git checkout druid-0.6.61
+git checkout druid-0.6.69
./build.sh
```
### Downloading the DSK (Druid Standalone Kit)
-[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.61-bin.tar.gz) a stand-alone tarball and run it:
+[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.69-bin.tar.gz) a stand-alone tarball and run it:
``` bash
tar -xzf druid-services-0.X.X-bin.tar.gz
diff --git a/docs/content/GroupByQuery.md b/docs/content/GroupByQuery.md
index 9edca5d2861..ca6ef8e277a 100644
--- a/docs/content/GroupByQuery.md
+++ b/docs/content/GroupByQuery.md
@@ -48,7 +48,7 @@ There are 9 main parts to a groupBy query:
|property|description|required?|
|--------|-----------|---------|
|queryType|This String should always be "groupBy"; this is the first thing Druid looks at to figure out how to interpret the query|yes|
-|dataSource|A String defining the data source to query, very similar to a table in a relational database|yes|
+|dataSource|A String defining the data source to query, very similar to a table in a relational database, or a [DataSource](DataSource.html) structure.|yes|
|dimensions|A JSON list of dimensions to do the groupBy over|yes|
|orderBy|See [OrderBy](OrderBy.html).|no|
|having|See [Having](Having.html).|no|
diff --git a/docs/content/Indexing-Service-Config.md b/docs/content/Indexing-Service-Config.md
index 88f89220d21..a7c27deb1a6 100644
--- a/docs/content/Indexing-Service-Config.md
+++ b/docs/content/Indexing-Service-Config.md
@@ -66,7 +66,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/indexer
-druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.61"]
+druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.69"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod
@@ -115,7 +115,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/worker
-druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.61","io.druid.extensions:druid-kafka-seven:0.6.61"]
+druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.69","io.druid.extensions:druid-kafka-seven:0.6.69"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod
diff --git a/docs/content/Post-aggregations.md b/docs/content/Post-aggregations.md
index 527d64e7971..4dce46ceff1 100644
--- a/docs/content/Post-aggregations.md
+++ b/docs/content/Post-aggregations.md
@@ -64,6 +64,31 @@ Example JavaScript aggregator:
"function": "function(delta, total) { return 100 * Math.abs(delta) / total; }"
}
```
+### `hyperUniqueCardinality` post-aggregator
+
+The hyperUniqueCardinality post aggregator is used to wrap a hyperUnique object such that it can be used in post aggregations.
+
+```json
+{ "type" : "hyperUniqueCardinality", "fieldName" : }
+```
+
+It can be used in a sample calculation as so:
+
+```json
+ "aggregations" : [{
+ {"type" : "count", "name" : "rows"},
+ {"type" : "hyperUnique", "name" : "unique_users", "fieldName" : "uniques"}
+ }],
+ "postAggregations" : {
+ "type" : "arithmetic",
+ "name" : "average_users_per_row",
+ "fn" : "/",
+ "fields" : [
+ { "type" : "hyperUniqueCardinality", "fieldName" : "unique_users" },
+ { "type" : "fieldAccess", "name" : "rows", "fieldName" : "rows" }
+ ]
+ }
+```
### Example Usage
@@ -98,4 +123,4 @@ The format of the query JSON is as follows:
...
}
-```
+```
\ No newline at end of file
diff --git a/docs/content/Realtime-Config.md b/docs/content/Realtime-Config.md
index 551d6704ae8..bda18373e5c 100644
--- a/docs/content/Realtime-Config.md
+++ b/docs/content/Realtime-Config.md
@@ -27,7 +27,7 @@ druid.host=localhost
druid.service=realtime
druid.port=8083
-druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.61"]
+druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.69"]
druid.zk.service.host=localhost
@@ -76,7 +76,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/realtime
-druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.61","io.druid.extensions:druid-kafka-seven:0.6.61"]
+druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.69","io.druid.extensions:druid-kafka-seven:0.6.69"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod
diff --git a/docs/content/Tasks.md b/docs/content/Tasks.md
index d907e46cdae..2a34c9989b3 100644
--- a/docs/content/Tasks.md
+++ b/docs/content/Tasks.md
@@ -152,10 +152,7 @@ The indexing service can also run real-time tasks. These tasks effectively trans
"intermediatePersistPeriod": "PT10m"
},
"windowPeriod": "PT10m",
- "segmentGranularity": "hour",
- "rejectionPolicy": {
- "type": "messageTime"
- }
+ "segmentGranularity": "hour"
}
```
diff --git a/docs/content/Tutorial:-A-First-Look-at-Druid.md b/docs/content/Tutorial:-A-First-Look-at-Druid.md
index cbf04e9a708..72390fba9a6 100644
--- a/docs/content/Tutorial:-A-First-Look-at-Druid.md
+++ b/docs/content/Tutorial:-A-First-Look-at-Druid.md
@@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
### Download a Tarball
-We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.61-bin.tar.gz). Download this file to a directory of your choosing.
+We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.69-bin.tar.gz). Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
-cd druid-services-0.6.61
+cd druid-services-0.6.69
```
You should see a bunch of files:
diff --git a/docs/content/Tutorial:-Loading-Your-Data-Part-2.md b/docs/content/Tutorial:-Loading-Your-Data-Part-2.md
index ff065a7f16f..06b2281060a 100644
--- a/docs/content/Tutorial:-Loading-Your-Data-Part-2.md
+++ b/docs/content/Tutorial:-Loading-Your-Data-Part-2.md
@@ -160,13 +160,15 @@ You should be comfortable starting Druid nodes at this point. If not, it may be
"segmentGranularity": "hour",
"basePersistDirectory": "\/tmp\/realtime\/basePersist",
"rejectionPolicy": {
- "type": "none"
+ "type": "test"
}
}
}
]
```
+Note: This config uses a "test" rejection policy which will accept all events and timely hand off, however, we strongly recommend you do not use this in production. Using this rejection policy, segments for events for the same time range will be overridden.
+
3. Let's copy and paste some data into the Kafka console producer
```json
diff --git a/docs/content/Tutorial:-The-Druid-Cluster.md b/docs/content/Tutorial:-The-Druid-Cluster.md
index 958cd580752..1092409d248 100644
--- a/docs/content/Tutorial:-The-Druid-Cluster.md
+++ b/docs/content/Tutorial:-The-Druid-Cluster.md
@@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes and external depende
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
-You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.61-bin.tar.gz)
+You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.69-bin.tar.gz)
and untar the contents within by issuing:
@@ -149,7 +149,7 @@ druid.port=8081
druid.zk.service.host=localhost
-druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.61"]
+druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.69"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
@@ -240,7 +240,7 @@ druid.port=8083
druid.zk.service.host=localhost
-druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.61","io.druid.extensions:druid-kafka-seven:0.6.61"]
+druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.69","io.druid.extensions:druid-kafka-seven:0.6.69"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop
@@ -251,6 +251,9 @@ druid.publish.type=noop
# druid.db.connector.password=diurd
druid.processing.buffer.sizeBytes=100000000
+druid.processing.numThreads=1
+
+druid.monitoring.monitors=["io.druid.segment.realtime.RealtimeMetricsMonitor"]
```
Next Steps
diff --git a/docs/content/Tutorial:-Webstream.md b/docs/content/Tutorial:-Webstream.md
index 481b2b897b6..722e19aefd3 100644
--- a/docs/content/Tutorial:-Webstream.md
+++ b/docs/content/Tutorial:-Webstream.md
@@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
h3. Download a Tarball
-We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.61-bin.tar.gz)
+We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.69-bin.tar.gz)
Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
-cd druid-services-0.6.61
+cd druid-services-0.6.69
```
You should see a bunch of files:
diff --git a/docs/content/Twitter-Tutorial.textile b/docs/content/Twitter-Tutorial.textile
index 9652bdb2355..2c6dc466f97 100644
--- a/docs/content/Twitter-Tutorial.textile
+++ b/docs/content/Twitter-Tutorial.textile
@@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source.
h3. Download a Tarball
-We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.61-bin.tar.gz.
+We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.69-bin.tar.gz.
Download this bad boy to a directory of your choosing.
You can extract the awesomeness within by issuing:
diff --git a/examples/bin/examples/indexing/wikipedia.spec b/examples/bin/examples/indexing/wikipedia.spec
index f6488d198c3..6d7f1499b25 100644
--- a/examples/bin/examples/indexing/wikipedia.spec
+++ b/examples/bin/examples/indexing/wikipedia.spec
@@ -53,7 +53,7 @@
"segmentGranularity": "hour",
"basePersistDirectory": "\/tmp\/realtime\/basePersist",
"rejectionPolicy": {
- "type": "messageTime"
+ "type": "test"
}
}
}
diff --git a/examples/bin/examples/rabbitmq/rabbitmq_realtime.spec b/examples/bin/examples/rabbitmq/rabbitmq_realtime.spec
index 528e81f39cc..c9f0f192b18 100644
--- a/examples/bin/examples/rabbitmq/rabbitmq_realtime.spec
+++ b/examples/bin/examples/rabbitmq/rabbitmq_realtime.spec
@@ -43,6 +43,6 @@
"windowPeriod" : "PT5m",
"segmentGranularity":"hour",
"basePersistDirectory" : "/tmp/realtime/basePersist",
- "rejectionPolicy": { "type": "messageTime" }
+ "rejectionPolicy": { "type": "test" }
}
}]
diff --git a/examples/bin/run_example_server.sh b/examples/bin/run_example_server.sh
index f1adada0bfa..3d63e7cb0c3 100755
--- a/examples/bin/run_example_server.sh
+++ b/examples/bin/run_example_server.sh
@@ -55,8 +55,6 @@ JAVA_ARGS="${JAVA_ARGS} -Ddruid.realtime.specFile=${SPEC_FILE}"
DRUID_CP=${EXAMPLE_LOC}
#For a pull
-DRUID_CP=${DRUID_CP}:`ls ${SCRIPT_DIR}/../target/druid-examples-*-selfcontained.jar`
-DRUID_CP=${DRUID_CP}:`ls ${SCRIPT_DIR}/../../services/target/druid-services-*-selfcontained.jar`
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/../config/realtime
#For the kit
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/lib/*
diff --git a/examples/config/broker/runtime.properties b/examples/config/broker/runtime.properties
index a3cfdf335d3..8afae982654 100644
--- a/examples/config/broker/runtime.properties
+++ b/examples/config/broker/runtime.properties
@@ -2,4 +2,8 @@ druid.host=localhost
druid.service=broker
druid.port=8080
-druid.zk.service.host=localhost
\ No newline at end of file
+druid.zk.service.host=localhost
+
+# Change these to make Druid faster
+druid.processing.buffer.sizeBytes=100000000
+druid.processing.numThreads=1
diff --git a/examples/config/historical/runtime.properties b/examples/config/historical/runtime.properties
index a94ec460fb3..43f56fafa5b 100644
--- a/examples/config/historical/runtime.properties
+++ b/examples/config/historical/runtime.properties
@@ -4,7 +4,7 @@ druid.port=8081
druid.zk.service.host=localhost
-druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.61"]
+druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.69"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
diff --git a/examples/config/realtime/runtime.properties b/examples/config/realtime/runtime.properties
index 45471c989ac..d9b02275e85 100644
--- a/examples/config/realtime/runtime.properties
+++ b/examples/config/realtime/runtime.properties
@@ -4,7 +4,7 @@ druid.port=8083
druid.zk.service.host=localhost
-druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.61","io.druid.extensions:druid-kafka-seven:0.6.61","io.druid.extensions:druid-rabbitmq:0.6.61"]
+druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.69","io.druid.extensions:druid-kafka-seven:0.6.69","io.druid.extensions:druid-rabbitmq:0.6.69"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop
@@ -16,3 +16,5 @@ druid.publish.type=noop
druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1
+
+druid.monitoring.monitors=["io.druid.segment.realtime.RealtimeMetricsMonitor"]
diff --git a/examples/pom.xml b/examples/pom.xml
index bce8ed35efd..d8ba4eb6220 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.63-SNAPSHOT
+ 0.6.70-SNAPSHOT
diff --git a/hdfs-storage/pom.xml b/hdfs-storage/pom.xml
index a6990497dd8..af7a44cef64 100644
--- a/hdfs-storage/pom.xml
+++ b/hdfs-storage/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.63-SNAPSHOT
+ 0.6.70-SNAPSHOT
diff --git a/hll/pom.xml b/hll/pom.xml
index 09e6fba02e6..d589e983778 100644
--- a/hll/pom.xml
+++ b/hll/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.63-SNAPSHOT
+ 0.6.70-SNAPSHOT
diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml
index e164043244e..b965f76360a 100644
--- a/indexing-hadoop/pom.xml
+++ b/indexing-hadoop/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.63-SNAPSHOT
+ 0.6.70-SNAPSHOT
@@ -97,11 +97,6 @@
junit
test
-
- com.clearspring.analytics
- stream
- 2.5.2
-
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java
index 771373b92d6..95b8ddd61cf 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java
@@ -19,8 +19,6 @@
package io.druid.indexer;
-import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
-import com.clearspring.analytics.stream.cardinality.HyperLogLog;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
@@ -36,6 +34,7 @@ import io.druid.data.input.InputRow;
import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
+import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.NoneShardSpec;
import org.apache.hadoop.conf.Configuration;
@@ -56,6 +55,7 @@ import org.joda.time.Interval;
import java.io.IOException;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -67,7 +67,6 @@ public class DetermineHashedPartitionsJob implements Jobby
{
private static final int MAX_SHARDS = 128;
private static final Logger log = new Logger(DetermineHashedPartitionsJob.class);
- private static final int HYPER_LOG_LOG_BIT_SIZE = 20;
private final HadoopDruidIndexerConfig config;
public DetermineHashedPartitionsJob(
@@ -99,8 +98,8 @@ public class DetermineHashedPartitionsJob implements Jobby
groupByJob.setOutputKeyClass(NullWritable.class);
groupByJob.setOutputValueClass(NullWritable.class);
groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class);
- if(!config.getSegmentGranularIntervals().isPresent()){
- groupByJob.setNumReduceTasks(1);
+ if (!config.getSegmentGranularIntervals().isPresent()) {
+ groupByJob.setNumReduceTasks(1);
}
JobHelper.setupClasspath(config, groupByJob);
@@ -201,7 +200,7 @@ public class DetermineHashedPartitionsJob implements Jobby
{
private static HashFunction hashFunction = Hashing.murmur3_128();
private QueryGranularity rollupGranularity = null;
- private Map hyperLogLogs;
+ private Map hyperLogLogs;
private HadoopDruidIndexerConfig config;
private boolean determineIntervals;
@@ -215,9 +214,9 @@ public class DetermineHashedPartitionsJob implements Jobby
Optional> intervals = config.getSegmentGranularIntervals();
if (intervals.isPresent()) {
determineIntervals = false;
- final ImmutableMap.Builder builder = ImmutableMap.builder();
+ final ImmutableMap.Builder builder = ImmutableMap.builder();
for (final Interval bucketInterval : intervals.get()) {
- builder.put(bucketInterval, new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE));
+ builder.put(bucketInterval, HyperLogLogCollector.makeLatestCollector());
}
hyperLogLogs = builder.build();
} else {
@@ -245,7 +244,7 @@ public class DetermineHashedPartitionsJob implements Jobby
.bucket(new DateTime(inputRow.getTimestampFromEpoch()));
if (!hyperLogLogs.containsKey(interval)) {
- hyperLogLogs.put(interval, new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE));
+ hyperLogLogs.put(interval, HyperLogLogCollector.makeLatestCollector());
}
} else {
final Optional maybeInterval = config.getGranularitySpec()
@@ -257,9 +256,9 @@ public class DetermineHashedPartitionsJob implements Jobby
interval = maybeInterval.get();
}
hyperLogLogs.get(interval)
- .offerHashed(
+ .add(
hashFunction.hashBytes(HadoopDruidIndexerConfig.jsonMapper.writeValueAsBytes(groupKey))
- .asLong()
+ .asBytes()
);
}
@@ -272,10 +271,10 @@ public class DetermineHashedPartitionsJob implements Jobby
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
- for (Map.Entry entry : hyperLogLogs.entrySet()) {
+ for (Map.Entry entry : hyperLogLogs.entrySet()) {
context.write(
new LongWritable(entry.getKey().getStartMillis()),
- new BytesWritable(entry.getValue().getBytes())
+ new BytesWritable(entry.getValue().toByteArray())
);
}
cleanup(context);
@@ -303,15 +302,9 @@ public class DetermineHashedPartitionsJob implements Jobby
Context context
) throws IOException, InterruptedException
{
- HyperLogLog aggregate = new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE);
+ HyperLogLogCollector aggregate = HyperLogLogCollector.makeLatestCollector();
for (BytesWritable value : values) {
- HyperLogLog logValue = HyperLogLog.Builder.build(getDataBytes(value));
- try {
- aggregate.addAll(logValue);
- }
- catch (CardinalityMergeException e) {
- e.printStackTrace(); // TODO: check for better handling
- }
+ aggregate.fold(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()));
}
Interval interval = config.getGranularitySpec().getSegmentGranularity().bucket(new DateTime(key.get()));
intervals.add(interval);
@@ -327,7 +320,7 @@ public class DetermineHashedPartitionsJob implements Jobby
}
).writeValue(
out,
- aggregate.cardinality()
+ new Double(aggregate.estimateCardinality()).longValue()
);
}
finally {
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDriverConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDriverConfig.java
index 012f6fb3c37..299b6f8a7f5 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDriverConfig.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDriverConfig.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.ImmutableMap;
-import com.google.common.io.Files;
import io.druid.indexer.partitions.HashedPartitionsSpec;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.segment.indexing.DriverConfig;
@@ -37,7 +36,6 @@ import java.util.Map;
@JsonTypeName("hadoop")
public class HadoopDriverConfig implements DriverConfig
{
- private static final String defaultWorkingPath = Files.createTempDir().getPath();
private static final PartitionsSpec defaultPartitionsSpec = HashedPartitionsSpec.makeDefaultHashedPartitionsSpec();
private static final Map> defaultShardSpecs = ImmutableMap.>of();
private static final int defaultRowFlushBoundary = 80000;
@@ -45,7 +43,7 @@ public class HadoopDriverConfig implements DriverConfig
public static HadoopDriverConfig makeDefaultDriverConfig()
{
return new HadoopDriverConfig(
- defaultWorkingPath,
+ null,
new DateTime().toString(),
defaultPartitionsSpec,
defaultShardSpecs,
@@ -80,7 +78,7 @@ public class HadoopDriverConfig implements DriverConfig
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows
)
{
- this.workingPath = workingPath == null ? defaultWorkingPath : workingPath;
+ this.workingPath = workingPath == null ? null : workingPath;
this.version = version == null ? new DateTime().toString() : version;
this.partitionsSpec = partitionsSpec == null ? defaultPartitionsSpec : partitionsSpec;
this.shardSpecs = shardSpecs == null ? defaultShardSpecs : shardSpecs;
diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index 4b3b68fb4d0..b29f1a301a7 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.63-SNAPSHOT
+ 0.6.70-SNAPSHOT
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
index 9c3f56516c0..baafc1a9735 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
@@ -283,7 +283,7 @@ public class HadoopIndexTask extends AbstractTask
Jobby job = new HadoopDruidDetermineConfigurationJob(config);
- log.info("Starting a hadoop index generator job...");
+ log.info("Starting a hadoop determine configuration job...");
if (job.run()) {
return HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(config.getSchema());
}
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java b/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java
index dc02ee9d4ef..2afa4ed0dd5 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java
@@ -90,7 +90,7 @@ public class IndexerDBCoordinator
final ResultIterator