fix results ordering

This commit is contained in:
Xavier Léauté 2014-11-13 14:59:56 -08:00
parent d19b895fb9
commit 716cc2cff2
1 changed files with 38 additions and 17 deletions

View File

@ -25,6 +25,7 @@ import com.google.common.base.Function;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
@ -35,6 +36,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.Pair; import com.metamx.common.Pair;
import com.metamx.common.guava.BaseSequence; import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.Comparators;
import com.metamx.common.guava.LazySequence; import com.metamx.common.guava.LazySequence;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
@ -43,6 +45,7 @@ import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig; import io.druid.client.cache.CacheConfig;
import io.druid.client.selector.QueryableDruidServer; import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.ServerSelector; import io.druid.client.selector.ServerSelector;
import io.druid.common.utils.JodaUtils;
import io.druid.guice.annotations.Smile; import io.druid.guice.annotations.Smile;
import io.druid.query.BySegmentResultValueClass; import io.druid.query.BySegmentResultValueClass;
import io.druid.query.CacheStrategy; import io.druid.query.CacheStrategy;
@ -121,7 +124,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final Map<DruidServer, List<SegmentDescriptor>> serverSegments = Maps.newTreeMap(); final Map<DruidServer, List<SegmentDescriptor>> serverSegments = Maps.newTreeMap();
final List<Pair<DateTime, byte[]>> cachedResults = Lists.newArrayList(); final List<Pair<Interval, byte[]>> cachedResults = Lists.newArrayList();
final Map<String, CachePopulator> cachePopulatorMap = Maps.newHashMap(); final Map<String, CachePopulator> cachePopulatorMap = Maps.newHashMap();
final boolean useCache = query.getContextUseCache(true) final boolean useCache = query.getContextUseCache(true)
@ -214,7 +217,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
if (cachedValue != null) { if (cachedValue != null) {
// remove cached segment from set of segments to query // remove cached segment from set of segments to query
segments.remove(segment); segments.remove(segment);
cachedResults.add(Pair.of(segmentQueryInterval.getStart(), cachedValue)); cachedResults.add(Pair.of(segmentQueryInterval, cachedValue));
} else if (populateCache) { } else if (populateCache) {
final String segmentIdentifier = segment.lhs.getSegment().getIdentifier(); final String segmentIdentifier = segment.lhs.getSegment().getIdentifier();
cachePopulatorMap.put( cachePopulatorMap.put(
@ -250,35 +253,47 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
@Override @Override
public Sequence<T> get() public Sequence<T> get()
{ {
ArrayList<Pair<DateTime, Sequence<T>>> listOfSequences = Lists.newArrayList(); ArrayList<Pair<Interval, Sequence<T>>> sequencesByInterval = Lists.newArrayList();
addSequencesFromCache(sequencesByInterval);
addSequencesFromServer(sequencesByInterval);
addSequencesFromServer(listOfSequences); if(sequencesByInterval.isEmpty()) {
addSequencesFromCache(listOfSequences); return Sequences.empty();
}
Collections.sort( Collections.sort(
listOfSequences, sequencesByInterval,
Ordering.natural().onResultOf(Pair.<DateTime, Sequence<T>>lhsFn()) Ordering.from(Comparators.intervalsByStartThenEnd()).onResultOf(Pair.<Interval, Sequence<T>>lhsFn())
); );
// result sequences from overlapping intervals could start anywhere within that interval
// therefore we cannot assume any ordering with respect to the first result from each
// and must resort to calling toolchest.mergeSequencesUnordered for those.
Iterator<Pair<Interval, Sequence<T>>> iterator = sequencesByInterval.iterator();
Pair<Interval, Sequence<T>> current = iterator.next();
final List<Sequence<T>> orderedSequences = Lists.newLinkedList(); final List<Sequence<T>> orderedSequences = Lists.newLinkedList();
DateTime unorderedStart = null;
List<Sequence<T>> unordered = Lists.newLinkedList(); List<Sequence<T>> unordered = Lists.newLinkedList();
for (Pair<DateTime, Sequence<T>> sequencePair : listOfSequences) {
if (unorderedStart != null && unorderedStart.getMillis() != sequencePair.lhs.getMillis()) { unordered.add(current.rhs);
while(iterator.hasNext()) {
Pair<Interval, Sequence<T>> next = iterator.next();
if(!next.lhs.overlaps(current.lhs)) {
orderedSequences.add(toolChest.mergeSequencesUnordered(Sequences.simple(unordered))); orderedSequences.add(toolChest.mergeSequencesUnordered(Sequences.simple(unordered)));
unordered = Lists.newLinkedList(); unordered = Lists.newLinkedList();
} }
unorderedStart = sequencePair.lhs; unordered.add(next.rhs);
unordered.add(sequencePair.rhs); current = next;
} }
if(!unordered.isEmpty()) { if(!unordered.isEmpty()) {
orderedSequences.add(toolChest.mergeSequencesUnordered(Sequences.simple(unordered))); orderedSequences.add(toolChest.mergeSequencesUnordered(Sequences.simple(unordered)));
} }
return toolChest.mergeSequences(Sequences.simple(orderedSequences)); return toolChest.mergeSequencesUnordered(Sequences.simple(orderedSequences));
} }
private void addSequencesFromCache(ArrayList<Pair<DateTime, Sequence<T>>> listOfSequences) private void addSequencesFromCache(ArrayList<Pair<Interval, Sequence<T>>> listOfSequences)
{ {
if (strategy == null) { if (strategy == null) {
return; return;
@ -286,7 +301,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final Function<Object, T> pullFromCacheFunction = strategy.pullFromCache(); final Function<Object, T> pullFromCacheFunction = strategy.pullFromCache();
final TypeReference<Object> cacheObjectClazz = strategy.getCacheObjectClazz(); final TypeReference<Object> cacheObjectClazz = strategy.getCacheObjectClazz();
for (Pair<DateTime, byte[]> cachedResultPair : cachedResults) { for (Pair<Interval, byte[]> cachedResultPair : cachedResults) {
final byte[] cachedResult = cachedResultPair.rhs; final byte[] cachedResult = cachedResultPair.rhs;
Sequence<Object> cachedSequence = new BaseSequence<>( Sequence<Object> cachedSequence = new BaseSequence<>(
new BaseSequence.IteratorMaker<Object, Iterator<Object>>() new BaseSequence.IteratorMaker<Object, Iterator<Object>>()
@ -320,7 +335,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void addSequencesFromServer(ArrayList<Pair<DateTime, Sequence<T>>> listOfSequences) private void addSequencesFromServer(ArrayList<Pair<Interval, Sequence<T>>> listOfSequences)
{ {
for (Map.Entry<DruidServer, List<SegmentDescriptor>> entry : serverSegments.entrySet()) { for (Map.Entry<DruidServer, List<SegmentDescriptor>> entry : serverSegments.entrySet()) {
final DruidServer server = entry.getKey(); final DruidServer server = entry.getKey();
@ -396,7 +411,13 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
); );
} }
listOfSequences.add(Pair.of(intervals.get(0).getStart(), resultSeqToAdd));
listOfSequences.add(
Pair.of(
new Interval(intervals.get(0).getStart(), intervals.get(intervals.size() - 1).getEnd()),
resultSeqToAdd
)
);
} }
} }
} }