mirror of https://github.com/apache/druid.git
Merge pull request #869 from metamx/fix-results-ordering
Fix results ordering on broker
This commit is contained in:
commit
ac04f1790d
|
@ -20,6 +20,7 @@
|
|||
package io.druid.collections;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.metamx.common.guava.BaseSequence;
|
||||
|
@ -28,7 +29,7 @@ import com.metamx.common.guava.Sequence;
|
|||
import com.metamx.common.guava.SequenceTestHelper;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import com.metamx.common.guava.TestSequence;
|
||||
import junit.framework.Assert;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
@ -59,6 +60,61 @@ public class OrderedMergeSequenceTest
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergeEmptySequence() throws Exception
|
||||
{
|
||||
final ArrayList<TestSequence<Integer>> testSequences = Lists.newArrayList(
|
||||
TestSequence.create(ImmutableList.<Integer>of()),
|
||||
TestSequence.create(2, 8),
|
||||
TestSequence.create(4, 6, 8)
|
||||
);
|
||||
|
||||
OrderedMergeSequence<Integer> seq = makeMergedSequence(Ordering.<Integer>natural(), testSequences);
|
||||
|
||||
SequenceTestHelper.testAll(seq, Arrays.asList(2, 4, 6, 8, 8));
|
||||
|
||||
for (TestSequence<Integer> sequence : testSequences) {
|
||||
Assert.assertTrue(sequence.isClosed());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergeEmptySequenceAtEnd() throws Exception
|
||||
{
|
||||
final ArrayList<TestSequence<Integer>> testSequences = Lists.newArrayList(
|
||||
TestSequence.create(2, 8),
|
||||
TestSequence.create(4, 6, 8),
|
||||
TestSequence.create(ImmutableList.<Integer>of())
|
||||
);
|
||||
|
||||
OrderedMergeSequence<Integer> seq = makeMergedSequence(Ordering.<Integer>natural(), testSequences);
|
||||
|
||||
SequenceTestHelper.testAll(seq, Arrays.asList(2, 4, 6, 8, 8));
|
||||
|
||||
for (TestSequence<Integer> sequence : testSequences) {
|
||||
Assert.assertTrue(sequence.isClosed());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testMergeEmptySequenceMiddle() throws Exception
|
||||
{
|
||||
final ArrayList<TestSequence<Integer>> testSequences = Lists.newArrayList(
|
||||
TestSequence.create(2, 8),
|
||||
TestSequence.create(ImmutableList.<Integer>of()),
|
||||
TestSequence.create(4, 6, 8)
|
||||
);
|
||||
|
||||
OrderedMergeSequence<Integer> seq = makeMergedSequence(Ordering.<Integer>natural(), testSequences);
|
||||
|
||||
SequenceTestHelper.testAll(seq, Arrays.asList(2, 4, 6, 8, 8));
|
||||
|
||||
for (TestSequence<Integer> sequence : testSequences) {
|
||||
Assert.assertTrue(sequence.isClosed());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScrewsUpOnOutOfOrderBeginningOfList() throws Exception
|
||||
{
|
||||
|
|
|
@ -25,6 +25,7 @@ import com.google.common.base.Function;
|
|||
import com.google.common.base.Supplier;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Iterators;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
@ -35,6 +36,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|||
import com.google.inject.Inject;
|
||||
import com.metamx.common.Pair;
|
||||
import com.metamx.common.guava.BaseSequence;
|
||||
import com.metamx.common.guava.Comparators;
|
||||
import com.metamx.common.guava.LazySequence;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
|
@ -43,6 +45,7 @@ import io.druid.client.cache.Cache;
|
|||
import io.druid.client.cache.CacheConfig;
|
||||
import io.druid.client.selector.QueryableDruidServer;
|
||||
import io.druid.client.selector.ServerSelector;
|
||||
import io.druid.common.utils.JodaUtils;
|
||||
import io.druid.guice.annotations.Smile;
|
||||
import io.druid.query.BySegmentResultValueClass;
|
||||
import io.druid.query.CacheStrategy;
|
||||
|
@ -121,7 +124,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
|
||||
final Map<DruidServer, List<SegmentDescriptor>> serverSegments = Maps.newTreeMap();
|
||||
|
||||
final List<Pair<DateTime, byte[]>> cachedResults = Lists.newArrayList();
|
||||
final List<Pair<Interval, byte[]>> cachedResults = Lists.newArrayList();
|
||||
final Map<String, CachePopulator> cachePopulatorMap = Maps.newHashMap();
|
||||
|
||||
final boolean useCache = query.getContextUseCache(true)
|
||||
|
@ -214,7 +217,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
if (cachedValue != null) {
|
||||
// remove cached segment from set of segments to query
|
||||
segments.remove(segment);
|
||||
cachedResults.add(Pair.of(segmentQueryInterval.getStart(), cachedValue));
|
||||
cachedResults.add(Pair.of(segmentQueryInterval, cachedValue));
|
||||
} else if (populateCache) {
|
||||
final String segmentIdentifier = segment.lhs.getSegment().getIdentifier();
|
||||
cachePopulatorMap.put(
|
||||
|
@ -250,35 +253,14 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
@Override
|
||||
public Sequence<T> get()
|
||||
{
|
||||
ArrayList<Pair<DateTime, Sequence<T>>> listOfSequences = Lists.newArrayList();
|
||||
ArrayList<Pair<Interval, Sequence<T>>> sequencesByInterval = Lists.newArrayList();
|
||||
addSequencesFromCache(sequencesByInterval);
|
||||
addSequencesFromServer(sequencesByInterval);
|
||||
|
||||
addSequencesFromServer(listOfSequences);
|
||||
addSequencesFromCache(listOfSequences);
|
||||
|
||||
Collections.sort(
|
||||
listOfSequences,
|
||||
Ordering.natural().onResultOf(Pair.<DateTime, Sequence<T>>lhsFn())
|
||||
);
|
||||
|
||||
final List<Sequence<T>> orderedSequences = Lists.newLinkedList();
|
||||
DateTime unorderedStart = null;
|
||||
List<Sequence<T>> unordered = Lists.newLinkedList();
|
||||
for (Pair<DateTime, Sequence<T>> sequencePair : listOfSequences) {
|
||||
if (unorderedStart != null && unorderedStart.getMillis() != sequencePair.lhs.getMillis()) {
|
||||
orderedSequences.add(toolChest.mergeSequencesUnordered(Sequences.simple(unordered)));
|
||||
unordered = Lists.newLinkedList();
|
||||
}
|
||||
unorderedStart = sequencePair.lhs;
|
||||
unordered.add(sequencePair.rhs);
|
||||
}
|
||||
if(!unordered.isEmpty()) {
|
||||
orderedSequences.add(toolChest.mergeSequencesUnordered(Sequences.simple(unordered)));
|
||||
}
|
||||
|
||||
return toolChest.mergeSequences(Sequences.simple(orderedSequences));
|
||||
return mergeCachedAndUncachedSequences(sequencesByInterval, toolChest);
|
||||
}
|
||||
|
||||
private void addSequencesFromCache(ArrayList<Pair<DateTime, Sequence<T>>> listOfSequences)
|
||||
private void addSequencesFromCache(ArrayList<Pair<Interval, Sequence<T>>> listOfSequences)
|
||||
{
|
||||
if (strategy == null) {
|
||||
return;
|
||||
|
@ -286,7 +268,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
|
||||
final Function<Object, T> pullFromCacheFunction = strategy.pullFromCache();
|
||||
final TypeReference<Object> cacheObjectClazz = strategy.getCacheObjectClazz();
|
||||
for (Pair<DateTime, byte[]> cachedResultPair : cachedResults) {
|
||||
for (Pair<Interval, byte[]> cachedResultPair : cachedResults) {
|
||||
final byte[] cachedResult = cachedResultPair.rhs;
|
||||
Sequence<Object> cachedSequence = new BaseSequence<>(
|
||||
new BaseSequence.IteratorMaker<Object, Iterator<Object>>()
|
||||
|
@ -320,7 +302,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void addSequencesFromServer(ArrayList<Pair<DateTime, Sequence<T>>> listOfSequences)
|
||||
private void addSequencesFromServer(ArrayList<Pair<Interval, Sequence<T>>> listOfSequences)
|
||||
{
|
||||
for (Map.Entry<DruidServer, List<SegmentDescriptor>> entry : serverSegments.entrySet()) {
|
||||
final DruidServer server = entry.getKey();
|
||||
|
@ -396,13 +378,60 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
);
|
||||
}
|
||||
|
||||
listOfSequences.add(Pair.of(intervals.get(0).getStart(), resultSeqToAdd));
|
||||
|
||||
listOfSequences.add(
|
||||
Pair.of(
|
||||
new Interval(intervals.get(0).getStart(), intervals.get(intervals.size() - 1).getEnd()),
|
||||
resultSeqToAdd
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
protected Sequence<T> mergeCachedAndUncachedSequences(
|
||||
List<Pair<Interval, Sequence<T>>> sequencesByInterval,
|
||||
QueryToolChest<T, Query<T>> toolChest
|
||||
)
|
||||
{
|
||||
if(sequencesByInterval.isEmpty()) {
|
||||
return Sequences.empty();
|
||||
}
|
||||
|
||||
Collections.sort(
|
||||
sequencesByInterval,
|
||||
Ordering.from(Comparators.intervalsByStartThenEnd()).onResultOf(Pair.<Interval, Sequence<T>>lhsFn())
|
||||
);
|
||||
|
||||
// result sequences from overlapping intervals could start anywhere within that interval
|
||||
// therefore we cannot assume any ordering with respect to the first result from each
|
||||
// and must resort to calling toolchest.mergeSequencesUnordered for those.
|
||||
Iterator<Pair<Interval, Sequence<T>>> iterator = sequencesByInterval.iterator();
|
||||
Pair<Interval, Sequence<T>> current = iterator.next();
|
||||
|
||||
final List<Sequence<T>> orderedSequences = Lists.newLinkedList();
|
||||
List<Sequence<T>> unordered = Lists.newLinkedList();
|
||||
|
||||
unordered.add(current.rhs);
|
||||
|
||||
while(iterator.hasNext()) {
|
||||
Pair<Interval, Sequence<T>> next = iterator.next();
|
||||
if(!next.lhs.overlaps(current.lhs)) {
|
||||
orderedSequences.add(toolChest.mergeSequencesUnordered(Sequences.simple(unordered)));
|
||||
unordered = Lists.newLinkedList();
|
||||
}
|
||||
unordered.add(next.rhs);
|
||||
current = next;
|
||||
}
|
||||
if(!unordered.isEmpty()) {
|
||||
orderedSequences.add(toolChest.mergeSequencesUnordered(Sequences.simple(unordered)));
|
||||
}
|
||||
|
||||
return toolChest.mergeSequencesUnordered(Sequences.simple(orderedSequences));
|
||||
}
|
||||
|
||||
private static class CachePopulator
|
||||
{
|
||||
private final Cache cache;
|
||||
|
|
|
@ -35,6 +35,7 @@ import com.google.common.collect.Ordering;
|
|||
import com.google.common.hash.HashFunction;
|
||||
import com.google.common.hash.Hashing;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.Pair;
|
||||
import com.metamx.common.guava.FunctionalIterable;
|
||||
import com.metamx.common.guava.MergeIterable;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
|
@ -598,6 +599,56 @@ public class CachingClusteredClientTest
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOutOfOrderSequenceMerging() throws Exception
|
||||
{
|
||||
List<Pair<Interval, Sequence<Result<TopNResultValue>>>> sequences =
|
||||
Lists.newArrayList(
|
||||
Pair.of(
|
||||
// this could ne the result of a historical node returning the merged result of
|
||||
// a) an empty result for segment 2011-01-02/2011-01-05
|
||||
// and b) result for a second partition for 2011-01-05/2011-01-10
|
||||
new Interval("2011-01-02/2011-01-10"),
|
||||
Sequences.simple(
|
||||
makeTopNResults(
|
||||
new DateTime("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
|
||||
new DateTime("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986,
|
||||
new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983
|
||||
)
|
||||
)
|
||||
),
|
||||
|
||||
Pair.of(
|
||||
new Interval("2011-01-05/2011-01-10"),
|
||||
Sequences.simple(
|
||||
makeTopNResults(
|
||||
new DateTime("2011-01-06T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
|
||||
new DateTime("2011-01-07T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
|
||||
new DateTime("2011-01-08T01"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986,
|
||||
new DateTime("2011-01-09T01"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(
|
||||
makeTopNResults(
|
||||
new DateTime("2011-01-06T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
|
||||
new DateTime("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
|
||||
new DateTime("2011-01-07T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
|
||||
new DateTime("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986,
|
||||
new DateTime("2011-01-08T01"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986,
|
||||
new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983,
|
||||
new DateTime("2011-01-09T01"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983
|
||||
),
|
||||
client.mergeCachedAndUncachedSequences(
|
||||
sequences,
|
||||
new TopNQueryQueryToolChest(new TopNQueryConfig())
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testTopNCachingEmptyResults() throws Exception
|
||||
|
|
Loading…
Reference in New Issue