fix metrics de-serialization for bySegment results

This commit is contained in:
Xavier Léauté 2014-12-09 16:58:35 -08:00
parent 123db3da4d
commit a7da9aed9f
1 changed files with 40 additions and 8 deletions

View File

@ -68,7 +68,6 @@ import org.joda.time.Interval;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -326,18 +325,52 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
continue; continue;
} }
final Sequence<T> resultSeqToAdd;
final MultipleSpecificSegmentSpec segmentSpec = new MultipleSpecificSegmentSpec(descriptors); final MultipleSpecificSegmentSpec segmentSpec = new MultipleSpecificSegmentSpec(descriptors);
List<Interval> intervals = segmentSpec.getIntervals(); List<Interval> intervals = segmentSpec.getIntervals();
final Sequence<T> resultSeqToAdd;
if (!server.isAssignable() || !populateCache || isBySegment) { // Direct server queryable if (!server.isAssignable() || !populateCache || isBySegment) { // Direct server queryable
resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec), responseContext); if (!isBySegment) {
resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec), responseContext);
} else {
@SuppressWarnings("unchecked")
final Query<Result<BySegmentResultValueClass<T>>> bySegmentQuery = (Query<Result<BySegmentResultValueClass<T>>>) query;
@SuppressWarnings("unchecked")
final Sequence<Result<BySegmentResultValueClass<T>>> resultSequence = clientQueryable.run(
bySegmentQuery.withQuerySegmentSpec(segmentSpec),
responseContext
);
resultSeqToAdd = (Sequence) Sequences.map(
resultSequence,
new Function<Result<BySegmentResultValueClass<T>>, Result<BySegmentResultValueClass<T>>>()
{
@Override
public Result<BySegmentResultValueClass<T>> apply(Result<BySegmentResultValueClass<T>> input)
{
final BySegmentResultValueClass<T> bySegmentValue = input.getValue();
return new Result<>(
input.getTimestamp(),
new BySegmentResultValueClass<T>(
Lists.transform(
bySegmentValue.getResults(),
toolChest.makePreComputeManipulatorFn(
query,
MetricManipulatorFns.deserializing()
)
),
bySegmentValue.getSegmentId(),
bySegmentValue.getInterval()
)
);
}
}
);
}
} else { // Requires some manipulation on broker side } else { // Requires some manipulation on broker side
@SuppressWarnings("unchecked")
final QueryRunner<Result<BySegmentResultValueClass<T>>> clientQueryableWithSegments = clientQueryable; final Sequence<Result<BySegmentResultValueClass<T>>> runningSequence = clientQueryable.run(
final Sequence<Result<BySegmentResultValueClass<T>>> runningSequence = clientQueryableWithSegments.run(
rewrittenQuery.withQuerySegmentSpec(segmentSpec), rewrittenQuery.withQuerySegmentSpec(segmentSpec),
responseContext responseContext
); );
@ -416,7 +449,6 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
); );
} }
listOfSequences.add( listOfSequences.add(
Pair.of( Pair.of(
new Interval(intervals.get(0).getStart(), intervals.get(intervals.size() - 1).getEnd()), new Interval(intervals.get(0).getStart(), intervals.get(intervals.size() - 1).getEnd()),