mirror of https://github.com/apache/druid.git
Merge pull request #421 from metamx/use-druid-hyperloglog
Use druid implementation of HyperLogLog
This commit is contained in:
commit
371c261038
|
@ -97,11 +97,6 @@
|
|||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.clearspring.analytics</groupId>
|
||||
<artifactId>stream</artifactId>
|
||||
<version>2.5.2</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
|
@ -19,8 +19,6 @@
|
|||
|
||||
package io.druid.indexer;
|
||||
|
||||
import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
|
||||
import com.clearspring.analytics.stream.cardinality.HyperLogLog;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.base.Throwables;
|
||||
|
@ -36,6 +34,7 @@ import io.druid.data.input.InputRow;
|
|||
import io.druid.data.input.Rows;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.indexer.granularity.UniformGranularitySpec;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
|
||||
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
|
||||
import io.druid.timeline.partition.NoneShardSpec;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -56,6 +55,7 @@ import org.joda.time.Interval;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -67,7 +67,6 @@ public class DetermineHashedPartitionsJob implements Jobby
|
|||
{
|
||||
private static final int MAX_SHARDS = 128;
|
||||
private static final Logger log = new Logger(DetermineHashedPartitionsJob.class);
|
||||
private static final int HYPER_LOG_LOG_BIT_SIZE = 20;
|
||||
private final HadoopDruidIndexerConfig config;
|
||||
|
||||
public DetermineHashedPartitionsJob(
|
||||
|
@ -99,8 +98,8 @@ public class DetermineHashedPartitionsJob implements Jobby
|
|||
groupByJob.setOutputKeyClass(NullWritable.class);
|
||||
groupByJob.setOutputValueClass(NullWritable.class);
|
||||
groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class);
|
||||
if(!config.getSegmentGranularIntervals().isPresent()){
|
||||
groupByJob.setNumReduceTasks(1);
|
||||
if (!config.getSegmentGranularIntervals().isPresent()) {
|
||||
groupByJob.setNumReduceTasks(1);
|
||||
}
|
||||
JobHelper.setupClasspath(config, groupByJob);
|
||||
|
||||
|
@ -194,7 +193,7 @@ public class DetermineHashedPartitionsJob implements Jobby
|
|||
{
|
||||
private static HashFunction hashFunction = Hashing.murmur3_128();
|
||||
private QueryGranularity rollupGranularity = null;
|
||||
private Map<Interval, HyperLogLog> hyperLogLogs;
|
||||
private Map<Interval, HyperLogLogCollector> hyperLogLogs;
|
||||
private HadoopDruidIndexerConfig config;
|
||||
private boolean determineIntervals;
|
||||
|
||||
|
@ -208,9 +207,9 @@ public class DetermineHashedPartitionsJob implements Jobby
|
|||
Optional<Set<Interval>> intervals = config.getSegmentGranularIntervals();
|
||||
if (intervals.isPresent()) {
|
||||
determineIntervals = false;
|
||||
final ImmutableMap.Builder<Interval, HyperLogLog> builder = ImmutableMap.builder();
|
||||
final ImmutableMap.Builder<Interval, HyperLogLogCollector> builder = ImmutableMap.builder();
|
||||
for (final Interval bucketInterval : intervals.get()) {
|
||||
builder.put(bucketInterval, new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE));
|
||||
builder.put(bucketInterval, HyperLogLogCollector.makeLatestCollector());
|
||||
}
|
||||
hyperLogLogs = builder.build();
|
||||
} else {
|
||||
|
@ -236,7 +235,7 @@ public class DetermineHashedPartitionsJob implements Jobby
|
|||
interval = config.getGranularitySpec().getGranularity().bucket(new DateTime(inputRow.getTimestampFromEpoch()));
|
||||
|
||||
if (!hyperLogLogs.containsKey(interval)) {
|
||||
hyperLogLogs.put(interval, new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE));
|
||||
hyperLogLogs.put(interval, HyperLogLogCollector.makeLatestCollector());
|
||||
}
|
||||
} else {
|
||||
final Optional<Interval> maybeInterval = config.getGranularitySpec()
|
||||
|
@ -248,9 +247,9 @@ public class DetermineHashedPartitionsJob implements Jobby
|
|||
interval = maybeInterval.get();
|
||||
}
|
||||
hyperLogLogs.get(interval)
|
||||
.offerHashed(
|
||||
.add(
|
||||
hashFunction.hashBytes(HadoopDruidIndexerConfig.jsonMapper.writeValueAsBytes(groupKey))
|
||||
.asLong()
|
||||
.asBytes()
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -263,10 +262,10 @@ public class DetermineHashedPartitionsJob implements Jobby
|
|||
map(context.getCurrentKey(), context.getCurrentValue(), context);
|
||||
}
|
||||
|
||||
for (Map.Entry<Interval, HyperLogLog> entry : hyperLogLogs.entrySet()) {
|
||||
for (Map.Entry<Interval, HyperLogLogCollector> entry : hyperLogLogs.entrySet()) {
|
||||
context.write(
|
||||
new LongWritable(entry.getKey().getStartMillis()),
|
||||
new BytesWritable(entry.getValue().getBytes())
|
||||
new BytesWritable(entry.getValue().toByteArray())
|
||||
);
|
||||
}
|
||||
cleanup(context);
|
||||
|
@ -294,15 +293,9 @@ public class DetermineHashedPartitionsJob implements Jobby
|
|||
Context context
|
||||
) throws IOException, InterruptedException
|
||||
{
|
||||
HyperLogLog aggregate = new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE);
|
||||
HyperLogLogCollector aggregate = HyperLogLogCollector.makeLatestCollector();
|
||||
for (BytesWritable value : values) {
|
||||
HyperLogLog logValue = HyperLogLog.Builder.build(getDataBytes(value));
|
||||
try {
|
||||
aggregate.addAll(logValue);
|
||||
}
|
||||
catch (CardinalityMergeException e) {
|
||||
e.printStackTrace(); // TODO: check for better handling
|
||||
}
|
||||
aggregate.fold(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()));
|
||||
}
|
||||
Interval interval = config.getGranularitySpec().getGranularity().bucket(new DateTime(key.get()));
|
||||
intervals.add(interval);
|
||||
|
@ -318,7 +311,7 @@ public class DetermineHashedPartitionsJob implements Jobby
|
|||
}
|
||||
).writeValue(
|
||||
out,
|
||||
aggregate.cardinality()
|
||||
new Double(aggregate.estimateCardinality()).longValue()
|
||||
);
|
||||
}
|
||||
finally {
|
||||
|
|
Loading…
Reference in New Issue