diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml
index e164043244e..8eb8dabccd4 100644
--- a/indexing-hadoop/pom.xml
+++ b/indexing-hadoop/pom.xml
@@ -97,11 +97,6 @@
junit
test
-
- com.clearspring.analytics
- stream
- 2.5.2
-
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java
index 407cf84dec3..ae2d61a9a93 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java
@@ -19,8 +19,6 @@
package io.druid.indexer;
-import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
-import com.clearspring.analytics.stream.cardinality.HyperLogLog;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
@@ -36,6 +34,7 @@ import io.druid.data.input.InputRow;
import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity;
import io.druid.indexer.granularity.UniformGranularitySpec;
+import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.NoneShardSpec;
import org.apache.hadoop.conf.Configuration;
@@ -56,6 +55,7 @@ import org.joda.time.Interval;
import java.io.IOException;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -67,7 +67,6 @@ public class DetermineHashedPartitionsJob implements Jobby
{
private static final int MAX_SHARDS = 128;
private static final Logger log = new Logger(DetermineHashedPartitionsJob.class);
- private static final int HYPER_LOG_LOG_BIT_SIZE = 20;
private final HadoopDruidIndexerConfig config;
public DetermineHashedPartitionsJob(
@@ -99,8 +98,8 @@ public class DetermineHashedPartitionsJob implements Jobby
groupByJob.setOutputKeyClass(NullWritable.class);
groupByJob.setOutputValueClass(NullWritable.class);
groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class);
- if(!config.getSegmentGranularIntervals().isPresent()){
- groupByJob.setNumReduceTasks(1);
+ if (!config.getSegmentGranularIntervals().isPresent()) {
+ groupByJob.setNumReduceTasks(1);
}
JobHelper.setupClasspath(config, groupByJob);
@@ -194,7 +193,7 @@ public class DetermineHashedPartitionsJob implements Jobby
{
private static HashFunction hashFunction = Hashing.murmur3_128();
private QueryGranularity rollupGranularity = null;
- private Map hyperLogLogs;
+ private Map hyperLogLogs;
private HadoopDruidIndexerConfig config;
private boolean determineIntervals;
@@ -208,9 +207,9 @@ public class DetermineHashedPartitionsJob implements Jobby
Optional> intervals = config.getSegmentGranularIntervals();
if (intervals.isPresent()) {
determineIntervals = false;
- final ImmutableMap.Builder builder = ImmutableMap.builder();
+ final ImmutableMap.Builder builder = ImmutableMap.builder();
for (final Interval bucketInterval : intervals.get()) {
- builder.put(bucketInterval, new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE));
+ builder.put(bucketInterval, HyperLogLogCollector.makeLatestCollector());
}
hyperLogLogs = builder.build();
} else {
@@ -236,7 +235,7 @@ public class DetermineHashedPartitionsJob implements Jobby
interval = config.getGranularitySpec().getGranularity().bucket(new DateTime(inputRow.getTimestampFromEpoch()));
if (!hyperLogLogs.containsKey(interval)) {
- hyperLogLogs.put(interval, new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE));
+ hyperLogLogs.put(interval, HyperLogLogCollector.makeLatestCollector());
}
} else {
final Optional maybeInterval = config.getGranularitySpec()
@@ -248,9 +247,9 @@ public class DetermineHashedPartitionsJob implements Jobby
interval = maybeInterval.get();
}
hyperLogLogs.get(interval)
- .offerHashed(
+ .add(
hashFunction.hashBytes(HadoopDruidIndexerConfig.jsonMapper.writeValueAsBytes(groupKey))
- .asLong()
+ .asBytes()
);
}
@@ -263,10 +262,10 @@ public class DetermineHashedPartitionsJob implements Jobby
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
- for (Map.Entry entry : hyperLogLogs.entrySet()) {
+ for (Map.Entry entry : hyperLogLogs.entrySet()) {
context.write(
new LongWritable(entry.getKey().getStartMillis()),
- new BytesWritable(entry.getValue().getBytes())
+ new BytesWritable(entry.getValue().toByteArray())
);
}
cleanup(context);
@@ -294,15 +293,9 @@ public class DetermineHashedPartitionsJob implements Jobby
Context context
) throws IOException, InterruptedException
{
- HyperLogLog aggregate = new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE);
+ HyperLogLogCollector aggregate = HyperLogLogCollector.makeLatestCollector();
for (BytesWritable value : values) {
- HyperLogLog logValue = HyperLogLog.Builder.build(getDataBytes(value));
- try {
- aggregate.addAll(logValue);
- }
- catch (CardinalityMergeException e) {
- e.printStackTrace(); // TODO: check for better handling
- }
+ aggregate.fold(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()));
}
Interval interval = config.getGranularitySpec().getGranularity().bucket(new DateTime(key.get()));
intervals.add(interval);
@@ -318,7 +311,7 @@ public class DetermineHashedPartitionsJob implements Jobby
}
).writeValue(
out,
- aggregate.cardinality()
+ new Double(aggregate.estimateCardinality()).longValue()
);
}
finally {
diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java
index dacfc7938ef..c17314c7cf8 100644
--- a/server/src/main/java/io/druid/client/CachingClusteredClient.java
+++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java
@@ -128,7 +128,7 @@ public class CachingClusteredClient implements QueryRunner
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false"));
- ImmutableMap.Builder contextBuilder = new ImmutableMap.Builder();
+ ImmutableMap.Builder contextBuilder = new ImmutableMap.Builder<>();
final String priority = query.getContextValue("priority", "0");
contextBuilder.put("priority", priority);
@@ -176,32 +176,37 @@ public class CachingClusteredClient implements QueryRunner
queryCacheKey = null;
}
- // Pull cached segments from cache and remove from set of segments to query
- if (useCache && queryCacheKey != null) {
+ if (queryCacheKey != null) {
Map, Cache.NamedKey> cacheKeys = Maps.newHashMap();
- for (Pair e : segments) {
- cacheKeys.put(e, computeSegmentCacheKey(e.lhs.getSegment().getIdentifier(), e.rhs, queryCacheKey));
+ for (Pair segment : segments) {
+ final Cache.NamedKey segmentCacheKey = computeSegmentCacheKey(
+ segment.lhs.getSegment().getIdentifier(),
+ segment.rhs,
+ queryCacheKey
+ );
+ cacheKeys.put(segment, segmentCacheKey);
}
- Map cachedValues = cache.getBulk(cacheKeys.values());
+ // Pull cached segments from cache and remove from set of segments to query
+ final Map cachedValues;
+ if (useCache) {
+ cachedValues = cache.getBulk(cacheKeys.values());
+ } else {
+ cachedValues = ImmutableMap.of();
+ }
for (Map.Entry, Cache.NamedKey> entry : cacheKeys.entrySet()) {
Pair segment = entry.getKey();
Cache.NamedKey segmentCacheKey = entry.getValue();
-
- final ServerSelector selector = segment.lhs;
- final SegmentDescriptor descriptor = segment.rhs;
- final Interval segmentQueryInterval = descriptor.getInterval();
+ final Interval segmentQueryInterval = segment.rhs.getInterval();
final byte[] cachedValue = cachedValues.get(segmentCacheKey);
-
if (cachedValue != null) {
- cachedResults.add(Pair.of(segmentQueryInterval.getStart(), cachedValue));
-
// remove cached segment from set of segments to query
segments.remove(segment);
- } else {
- final String segmentIdentifier = selector.getSegment().getIdentifier();
+ cachedResults.add(Pair.of(segmentQueryInterval.getStart(), cachedValue));
+ } else if (populateCache) {
+ final String segmentIdentifier = segment.lhs.getSegment().getIdentifier();
cachePopulatorMap.put(
String.format("%s_%s", segmentIdentifier, segmentQueryInterval),
new CachePopulator(cache, objectMapper, segmentCacheKey)
@@ -229,7 +234,7 @@ public class CachingClusteredClient implements QueryRunner
}
}
- return new LazySequence(
+ return new LazySequence<>(
new Supplier>()
{
@Override
@@ -265,7 +270,7 @@ public class CachingClusteredClient implements QueryRunner
final TypeReference