diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml index e164043244e..8eb8dabccd4 100644 --- a/indexing-hadoop/pom.xml +++ b/indexing-hadoop/pom.xml @@ -97,11 +97,6 @@ junit test - - com.clearspring.analytics - stream - 2.5.2 - diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java index 407cf84dec3..ae2d61a9a93 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java @@ -19,8 +19,6 @@ package io.druid.indexer; -import com.clearspring.analytics.stream.cardinality.CardinalityMergeException; -import com.clearspring.analytics.stream.cardinality.HyperLogLog; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Optional; import com.google.common.base.Throwables; @@ -36,6 +34,7 @@ import io.druid.data.input.InputRow; import io.druid.data.input.Rows; import io.druid.granularity.QueryGranularity; import io.druid.indexer.granularity.UniformGranularitySpec; +import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector; import io.druid.timeline.partition.HashBasedNumberedShardSpec; import io.druid.timeline.partition.NoneShardSpec; import org.apache.hadoop.conf.Configuration; @@ -56,6 +55,7 @@ import org.joda.time.Interval; import java.io.IOException; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.Set; @@ -67,7 +67,6 @@ public class DetermineHashedPartitionsJob implements Jobby { private static final int MAX_SHARDS = 128; private static final Logger log = new Logger(DetermineHashedPartitionsJob.class); - private static final int HYPER_LOG_LOG_BIT_SIZE = 20; private final HadoopDruidIndexerConfig config; public DetermineHashedPartitionsJob( @@ -99,8 +98,8 @@ public class DetermineHashedPartitionsJob implements Jobby groupByJob.setOutputKeyClass(NullWritable.class); groupByJob.setOutputValueClass(NullWritable.class); groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class); - if(!config.getSegmentGranularIntervals().isPresent()){ - groupByJob.setNumReduceTasks(1); + if (!config.getSegmentGranularIntervals().isPresent()) { + groupByJob.setNumReduceTasks(1); } JobHelper.setupClasspath(config, groupByJob); @@ -194,7 +193,7 @@ public class DetermineHashedPartitionsJob implements Jobby { private static HashFunction hashFunction = Hashing.murmur3_128(); private QueryGranularity rollupGranularity = null; - private Map hyperLogLogs; + private Map hyperLogLogs; private HadoopDruidIndexerConfig config; private boolean determineIntervals; @@ -208,9 +207,9 @@ public class DetermineHashedPartitionsJob implements Jobby Optional> intervals = config.getSegmentGranularIntervals(); if (intervals.isPresent()) { determineIntervals = false; - final ImmutableMap.Builder builder = ImmutableMap.builder(); + final ImmutableMap.Builder builder = ImmutableMap.builder(); for (final Interval bucketInterval : intervals.get()) { - builder.put(bucketInterval, new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE)); + builder.put(bucketInterval, HyperLogLogCollector.makeLatestCollector()); } hyperLogLogs = builder.build(); } else { @@ -236,7 +235,7 @@ public class DetermineHashedPartitionsJob implements Jobby interval = config.getGranularitySpec().getGranularity().bucket(new DateTime(inputRow.getTimestampFromEpoch())); if (!hyperLogLogs.containsKey(interval)) { - hyperLogLogs.put(interval, new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE)); + hyperLogLogs.put(interval, HyperLogLogCollector.makeLatestCollector()); } } else { final Optional maybeInterval = config.getGranularitySpec() @@ -248,9 +247,9 @@ public class DetermineHashedPartitionsJob implements Jobby interval = maybeInterval.get(); } hyperLogLogs.get(interval) - .offerHashed( + .add( hashFunction.hashBytes(HadoopDruidIndexerConfig.jsonMapper.writeValueAsBytes(groupKey)) - .asLong() + .asBytes() ); } @@ -263,10 +262,10 @@ public class DetermineHashedPartitionsJob implements Jobby map(context.getCurrentKey(), context.getCurrentValue(), context); } - for (Map.Entry entry : hyperLogLogs.entrySet()) { + for (Map.Entry entry : hyperLogLogs.entrySet()) { context.write( new LongWritable(entry.getKey().getStartMillis()), - new BytesWritable(entry.getValue().getBytes()) + new BytesWritable(entry.getValue().toByteArray()) ); } cleanup(context); @@ -294,15 +293,9 @@ public class DetermineHashedPartitionsJob implements Jobby Context context ) throws IOException, InterruptedException { - HyperLogLog aggregate = new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE); + HyperLogLogCollector aggregate = HyperLogLogCollector.makeLatestCollector(); for (BytesWritable value : values) { - HyperLogLog logValue = HyperLogLog.Builder.build(getDataBytes(value)); - try { - aggregate.addAll(logValue); - } - catch (CardinalityMergeException e) { - e.printStackTrace(); // TODO: check for better handling - } + aggregate.fold(ByteBuffer.wrap(value.getBytes(), 0, value.getLength())); } Interval interval = config.getGranularitySpec().getGranularity().bucket(new DateTime(key.get())); intervals.add(interval); @@ -318,7 +311,7 @@ public class DetermineHashedPartitionsJob implements Jobby } ).writeValue( out, - aggregate.cardinality() + new Double(aggregate.estimateCardinality()).longValue() ); } finally {