Merge pull request #950 from metamx/fix-bysegment-serde

Fix bySegment metrics de-serialization
This commit is contained in:
Charles Allen 2014-12-10 12:42:40 -08:00
commit 6f6cc463b0
3 changed files with 69 additions and 24 deletions

View File

@ -68,7 +68,6 @@ import org.joda.time.Interval;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@ -152,6 +151,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
contextBuilder.put("priority", priority);
if (populateCache) {
// prevent down-stream nodes from caching results as well if we are populating the cache
contextBuilder.put(CacheConfig.POPULATE_CACHE, false);
contextBuilder.put("bySegment", true);
}
@ -187,14 +187,17 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
}
final byte[] queryCacheKey;
if (strategy != null) {
if ( (populateCache || useCache) // implies strategy != null
&& !isBySegment ) // explicit bySegment queries are never cached
{
queryCacheKey = strategy.computeCacheKey(query);
} else {
queryCacheKey = null;
}
if (queryCacheKey != null) {
// cache keys must preserve segment ordering, in order for shards to always be combined in the same order
// cachKeys map must preserve segment ordering, in order for shards to always be combined in the same order
Map<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> cacheKeys = Maps.newLinkedHashMap();
for (Pair<ServerSelector, SegmentDescriptor> segment : segments) {
final Cache.NamedKey segmentCacheKey = CacheUtil.computeSegmentCacheKey(
@ -224,6 +227,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
segments.remove(segment);
cachedResults.add(Pair.of(segmentQueryInterval, cachedValue));
} else if (populateCache) {
// otherwise, if populating cache, add segment to list of segments to cache
final String segmentIdentifier = segment.lhs.getSegment().getIdentifier();
cachePopulatorMap.put(
String.format("%s_%s", segmentIdentifier, segmentQueryInterval),
@ -326,18 +330,54 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
continue;
}
final Sequence<T> resultSeqToAdd;
final MultipleSpecificSegmentSpec segmentSpec = new MultipleSpecificSegmentSpec(descriptors);
List<Interval> intervals = segmentSpec.getIntervals();
final List<Interval> intervals = segmentSpec.getIntervals();
final Sequence<T> resultSeqToAdd;
if (!server.isAssignable() || !populateCache || isBySegment) { // Direct server queryable
resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec), responseContext);
if (!isBySegment) {
resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec), responseContext);
} else {
// bySegment queries need to be de-serialized, see DirectDruidClient.run()
@SuppressWarnings("unchecked")
final Query<Result<BySegmentResultValueClass<T>>> bySegmentQuery = (Query<Result<BySegmentResultValueClass<T>>>) query;
@SuppressWarnings("unchecked")
final Sequence<Result<BySegmentResultValueClass<T>>> resultSequence = clientQueryable.run(
bySegmentQuery.withQuerySegmentSpec(segmentSpec),
responseContext
);
resultSeqToAdd = (Sequence) Sequences.map(
resultSequence,
new Function<Result<BySegmentResultValueClass<T>>, Result<BySegmentResultValueClass<T>>>()
{
@Override
public Result<BySegmentResultValueClass<T>> apply(Result<BySegmentResultValueClass<T>> input)
{
final BySegmentResultValueClass<T> bySegmentValue = input.getValue();
return new Result<>(
input.getTimestamp(),
new BySegmentResultValueClass<T>(
Lists.transform(
bySegmentValue.getResults(),
toolChest.makePreComputeManipulatorFn(
query,
MetricManipulatorFns.deserializing()
)
),
bySegmentValue.getSegmentId(),
bySegmentValue.getInterval()
)
);
}
}
);
}
} else { // Requires some manipulation on broker side
final QueryRunner<Result<BySegmentResultValueClass<T>>> clientQueryableWithSegments = clientQueryable;
final Sequence<Result<BySegmentResultValueClass<T>>> runningSequence = clientQueryableWithSegments.run(
@SuppressWarnings("unchecked")
final Sequence<Result<BySegmentResultValueClass<T>>> runningSequence = clientQueryable.run(
rewrittenQuery.withQuerySegmentSpec(segmentSpec),
responseContext
);
@ -353,6 +393,9 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
public Sequence<T> apply(Result<BySegmentResultValueClass<T>> input)
{
final BySegmentResultValueClass<T> value = input.getValue();
final CachePopulator cachePopulator = cachePopulatorMap.get(
String.format("%s_%s", value.getSegmentId(), value.getInterval())
);
final List<Object> cacheData = Lists.newLinkedList();
@ -365,18 +408,21 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
@Override
public T apply(final T input)
{
cacheFutures.add(
backgroundExecutorService.submit(
new Runnable()
{
@Override
public void run()
if(cachePopulator != null) {
// only compute cache data if populating cache
cacheFutures.add(
backgroundExecutorService.submit(
new Runnable()
{
cacheData.add(cacheFn.apply(input));
@Override
public void run()
{
cacheData.add(cacheFn.apply(input));
}
}
}
)
);
)
);
}
return input;
}
}
@ -393,9 +439,6 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
@Override
public void run()
{
CachePopulator cachePopulator = cachePopulatorMap.get(
String.format("%s_%s", value.getSegmentId(), value.getInterval())
);
if (cachePopulator != null) {
try {
Futures.allAsList(cacheFutures).get();
@ -416,7 +459,6 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
);
}
listOfSequences.add(
Pair.of(
new Interval(intervals.get(0).getStart(), intervals.get(intervals.size() - 1).getEnd()),

View File

@ -268,6 +268,8 @@ public class DirectDruidClient<T> implements QueryRunner<T>
}
);
// bySegment queries are de-serialized after caching results in order to
// avoid the cost of de-serializing and then re-serializing again when adding to cache
if (!isBySegment) {
retVal = Sequences.map(
retVal,

View File

@ -54,6 +54,7 @@ public class CacheConfig
{
return useCache;
}
public int getNumBackgroundThreads(){
return numBackgroundThreads;
}