Merge branch 'master' of github.com:metamx/druid

This commit is contained in:
fjy 2014-03-06 11:57:14 -08:00
commit e27b2fa7cb
5 changed files with 1527 additions and 46 deletions

View File

@ -97,11 +97,6 @@
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>com.clearspring.analytics</groupId>
<artifactId>stream</artifactId>
<version>2.5.2</version>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -19,8 +19,6 @@
package io.druid.indexer; package io.druid.indexer;
import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
import com.clearspring.analytics.stream.cardinality.HyperLogLog;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
@ -36,6 +34,7 @@ import io.druid.data.input.InputRow;
import io.druid.data.input.Rows; import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.indexer.granularity.UniformGranularitySpec; import io.druid.indexer.granularity.UniformGranularitySpec;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.timeline.partition.HashBasedNumberedShardSpec; import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.NoneShardSpec;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -56,6 +55,7 @@ import org.joda.time.Interval;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -67,7 +67,6 @@ public class DetermineHashedPartitionsJob implements Jobby
{ {
private static final int MAX_SHARDS = 128; private static final int MAX_SHARDS = 128;
private static final Logger log = new Logger(DetermineHashedPartitionsJob.class); private static final Logger log = new Logger(DetermineHashedPartitionsJob.class);
private static final int HYPER_LOG_LOG_BIT_SIZE = 20;
private final HadoopDruidIndexerConfig config; private final HadoopDruidIndexerConfig config;
public DetermineHashedPartitionsJob( public DetermineHashedPartitionsJob(
@ -99,8 +98,8 @@ public class DetermineHashedPartitionsJob implements Jobby
groupByJob.setOutputKeyClass(NullWritable.class); groupByJob.setOutputKeyClass(NullWritable.class);
groupByJob.setOutputValueClass(NullWritable.class); groupByJob.setOutputValueClass(NullWritable.class);
groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class); groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class);
if(!config.getSegmentGranularIntervals().isPresent()){ if (!config.getSegmentGranularIntervals().isPresent()) {
groupByJob.setNumReduceTasks(1); groupByJob.setNumReduceTasks(1);
} }
JobHelper.setupClasspath(config, groupByJob); JobHelper.setupClasspath(config, groupByJob);
@ -194,7 +193,7 @@ public class DetermineHashedPartitionsJob implements Jobby
{ {
private static HashFunction hashFunction = Hashing.murmur3_128(); private static HashFunction hashFunction = Hashing.murmur3_128();
private QueryGranularity rollupGranularity = null; private QueryGranularity rollupGranularity = null;
private Map<Interval, HyperLogLog> hyperLogLogs; private Map<Interval, HyperLogLogCollector> hyperLogLogs;
private HadoopDruidIndexerConfig config; private HadoopDruidIndexerConfig config;
private boolean determineIntervals; private boolean determineIntervals;
@ -208,9 +207,9 @@ public class DetermineHashedPartitionsJob implements Jobby
Optional<Set<Interval>> intervals = config.getSegmentGranularIntervals(); Optional<Set<Interval>> intervals = config.getSegmentGranularIntervals();
if (intervals.isPresent()) { if (intervals.isPresent()) {
determineIntervals = false; determineIntervals = false;
final ImmutableMap.Builder<Interval, HyperLogLog> builder = ImmutableMap.builder(); final ImmutableMap.Builder<Interval, HyperLogLogCollector> builder = ImmutableMap.builder();
for (final Interval bucketInterval : intervals.get()) { for (final Interval bucketInterval : intervals.get()) {
builder.put(bucketInterval, new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE)); builder.put(bucketInterval, HyperLogLogCollector.makeLatestCollector());
} }
hyperLogLogs = builder.build(); hyperLogLogs = builder.build();
} else { } else {
@ -236,7 +235,7 @@ public class DetermineHashedPartitionsJob implements Jobby
interval = config.getGranularitySpec().getGranularity().bucket(new DateTime(inputRow.getTimestampFromEpoch())); interval = config.getGranularitySpec().getGranularity().bucket(new DateTime(inputRow.getTimestampFromEpoch()));
if (!hyperLogLogs.containsKey(interval)) { if (!hyperLogLogs.containsKey(interval)) {
hyperLogLogs.put(interval, new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE)); hyperLogLogs.put(interval, HyperLogLogCollector.makeLatestCollector());
} }
} else { } else {
final Optional<Interval> maybeInterval = config.getGranularitySpec() final Optional<Interval> maybeInterval = config.getGranularitySpec()
@ -248,9 +247,9 @@ public class DetermineHashedPartitionsJob implements Jobby
interval = maybeInterval.get(); interval = maybeInterval.get();
} }
hyperLogLogs.get(interval) hyperLogLogs.get(interval)
.offerHashed( .add(
hashFunction.hashBytes(HadoopDruidIndexerConfig.jsonMapper.writeValueAsBytes(groupKey)) hashFunction.hashBytes(HadoopDruidIndexerConfig.jsonMapper.writeValueAsBytes(groupKey))
.asLong() .asBytes()
); );
} }
@ -263,10 +262,10 @@ public class DetermineHashedPartitionsJob implements Jobby
map(context.getCurrentKey(), context.getCurrentValue(), context); map(context.getCurrentKey(), context.getCurrentValue(), context);
} }
for (Map.Entry<Interval, HyperLogLog> entry : hyperLogLogs.entrySet()) { for (Map.Entry<Interval, HyperLogLogCollector> entry : hyperLogLogs.entrySet()) {
context.write( context.write(
new LongWritable(entry.getKey().getStartMillis()), new LongWritable(entry.getKey().getStartMillis()),
new BytesWritable(entry.getValue().getBytes()) new BytesWritable(entry.getValue().toByteArray())
); );
} }
cleanup(context); cleanup(context);
@ -294,15 +293,9 @@ public class DetermineHashedPartitionsJob implements Jobby
Context context Context context
) throws IOException, InterruptedException ) throws IOException, InterruptedException
{ {
HyperLogLog aggregate = new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE); HyperLogLogCollector aggregate = HyperLogLogCollector.makeLatestCollector();
for (BytesWritable value : values) { for (BytesWritable value : values) {
HyperLogLog logValue = HyperLogLog.Builder.build(getDataBytes(value)); aggregate.fold(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()));
try {
aggregate.addAll(logValue);
}
catch (CardinalityMergeException e) {
e.printStackTrace(); // TODO: check for better handling
}
} }
Interval interval = config.getGranularitySpec().getGranularity().bucket(new DateTime(key.get())); Interval interval = config.getGranularitySpec().getGranularity().bucket(new DateTime(key.get()));
intervals.add(interval); intervals.add(interval);
@ -318,7 +311,7 @@ public class DetermineHashedPartitionsJob implements Jobby
} }
).writeValue( ).writeValue(
out, out,
aggregate.cardinality() new Double(aggregate.estimateCardinality()).longValue()
); );
} }
finally { finally {

View File

@ -128,7 +128,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false")); final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false"));
ImmutableMap.Builder<String, String> contextBuilder = new ImmutableMap.Builder<String, String>(); ImmutableMap.Builder<String, String> contextBuilder = new ImmutableMap.Builder<>();
final String priority = query.getContextValue("priority", "0"); final String priority = query.getContextValue("priority", "0");
contextBuilder.put("priority", priority); contextBuilder.put("priority", priority);
@ -176,32 +176,37 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
queryCacheKey = null; queryCacheKey = null;
} }
// Pull cached segments from cache and remove from set of segments to query if (queryCacheKey != null) {
if (useCache && queryCacheKey != null) {
Map<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> cacheKeys = Maps.newHashMap(); Map<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> cacheKeys = Maps.newHashMap();
for (Pair<ServerSelector, SegmentDescriptor> e : segments) { for (Pair<ServerSelector, SegmentDescriptor> segment : segments) {
cacheKeys.put(e, computeSegmentCacheKey(e.lhs.getSegment().getIdentifier(), e.rhs, queryCacheKey)); final Cache.NamedKey segmentCacheKey = computeSegmentCacheKey(
segment.lhs.getSegment().getIdentifier(),
segment.rhs,
queryCacheKey
);
cacheKeys.put(segment, segmentCacheKey);
} }
Map<Cache.NamedKey, byte[]> cachedValues = cache.getBulk(cacheKeys.values()); // Pull cached segments from cache and remove from set of segments to query
final Map<Cache.NamedKey, byte[]> cachedValues;
if (useCache) {
cachedValues = cache.getBulk(cacheKeys.values());
} else {
cachedValues = ImmutableMap.of();
}
for (Map.Entry<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> entry : cacheKeys.entrySet()) { for (Map.Entry<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> entry : cacheKeys.entrySet()) {
Pair<ServerSelector, SegmentDescriptor> segment = entry.getKey(); Pair<ServerSelector, SegmentDescriptor> segment = entry.getKey();
Cache.NamedKey segmentCacheKey = entry.getValue(); Cache.NamedKey segmentCacheKey = entry.getValue();
final Interval segmentQueryInterval = segment.rhs.getInterval();
final ServerSelector selector = segment.lhs;
final SegmentDescriptor descriptor = segment.rhs;
final Interval segmentQueryInterval = descriptor.getInterval();
final byte[] cachedValue = cachedValues.get(segmentCacheKey); final byte[] cachedValue = cachedValues.get(segmentCacheKey);
if (cachedValue != null) { if (cachedValue != null) {
cachedResults.add(Pair.of(segmentQueryInterval.getStart(), cachedValue));
// remove cached segment from set of segments to query // remove cached segment from set of segments to query
segments.remove(segment); segments.remove(segment);
} else { cachedResults.add(Pair.of(segmentQueryInterval.getStart(), cachedValue));
final String segmentIdentifier = selector.getSegment().getIdentifier(); } else if (populateCache) {
final String segmentIdentifier = segment.lhs.getSegment().getIdentifier();
cachePopulatorMap.put( cachePopulatorMap.put(
String.format("%s_%s", segmentIdentifier, segmentQueryInterval), String.format("%s_%s", segmentIdentifier, segmentQueryInterval),
new CachePopulator(cache, objectMapper, segmentCacheKey) new CachePopulator(cache, objectMapper, segmentCacheKey)
@ -229,7 +234,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
} }
} }
return new LazySequence<T>( return new LazySequence<>(
new Supplier<Sequence<T>>() new Supplier<Sequence<T>>()
{ {
@Override @Override
@ -265,7 +270,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final TypeReference<Object> cacheObjectClazz = strategy.getCacheObjectClazz(); final TypeReference<Object> cacheObjectClazz = strategy.getCacheObjectClazz();
for (Pair<DateTime, byte[]> cachedResultPair : cachedResults) { for (Pair<DateTime, byte[]> cachedResultPair : cachedResults) {
final byte[] cachedResult = cachedResultPair.rhs; final byte[] cachedResult = cachedResultPair.rhs;
Sequence<Object> cachedSequence = new BaseSequence<Object, Iterator<Object>>( Sequence<Object> cachedSequence = new BaseSequence<>(
new BaseSequence.IteratorMaker<Object, Iterator<Object>>() new BaseSequence.IteratorMaker<Object, Iterator<Object>>()
{ {
@Override @Override
@ -331,9 +336,12 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
String segmentIdentifier = value.getSegmentId(); String segmentIdentifier = value.getSegmentId();
final Iterable<T> segmentResults = value.getResults(); final Iterable<T> segmentResults = value.getResults();
cachePopulatorMap.get( CachePopulator cachePopulator = cachePopulatorMap.get(
String.format("%s_%s", segmentIdentifier, value.getInterval()) String.format("%s_%s", segmentIdentifier, value.getInterval())
).populate(Iterables.transform(segmentResults, prepareForCache)); );
if(cachePopulator != null) {
cachePopulator.populate(Iterables.transform(segmentResults, prepareForCache));
}
return Sequences.simple( return Sequences.simple(
Iterables.transform( Iterables.transform(

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,89 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client;
import java.util.Iterator;
/**
*/
public class RangeIterable implements Iterable<Integer>
{
private final int end;
private final int start;
private final int increment;
public RangeIterable(
int end
)
{
this(0, end);
}
public RangeIterable(
int start,
int end
)
{
this(start, end, 1);
}
public RangeIterable(
int start,
int end,
final int i
)
{
this.start = start;
this.end = end;
this.increment = i;
}
@Override
public Iterator<Integer> iterator()
{
return new Iterator<Integer>()
{
private int curr = start;
@Override
public boolean hasNext()
{
return curr < end;
}
@Override
public Integer next()
{
try {
return curr;
}
finally {
curr += increment;
}
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
};
}
}