Merge pull request #801 from metamx/fix-unordered-merging

fix ordering for partitions with same start time, fixes #796
This commit is contained in:
Fangjin Yang 2014-10-21 16:47:31 -06:00
commit 3d8f33d4ad
2 changed files with 75 additions and 9 deletions

View File

@ -25,7 +25,6 @@ import com.google.common.base.Function;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
@ -261,14 +260,22 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
Ordering.natural().onResultOf(Pair.<DateTime, Sequence<T>>lhsFn()) Ordering.natural().onResultOf(Pair.<DateTime, Sequence<T>>lhsFn())
); );
final Sequence<Sequence<T>> seq = Sequences.simple( final List<Sequence<T>> orderedSequences = Lists.newLinkedList();
Iterables.transform(listOfSequences, Pair.<DateTime, Sequence<T>>rhsFn()) DateTime unorderedStart = null;
); List<Sequence<T>> unordered = Lists.newLinkedList();
if (strategy == null) { for (Pair<DateTime, Sequence<T>> sequencePair : listOfSequences) {
return toolChest.mergeSequences(seq); if (unorderedStart != null && unorderedStart.getMillis() != sequencePair.lhs.getMillis()) {
} else { orderedSequences.add(toolChest.mergeSequencesUnordered(Sequences.simple(unordered)));
return strategy.mergeSequences(seq); unordered = Lists.newLinkedList();
} }
unorderedStart = sequencePair.lhs;
unordered.add(sequencePair.rhs);
}
if(!unordered.isEmpty()) {
orderedSequences.add(toolChest.mergeSequencesUnordered(Sequences.simple(unordered)));
}
return toolChest.mergeSequences(Sequences.simple(orderedSequences));
} }
private void addSequencesFromCache(ArrayList<Pair<DateTime, Sequence<T>>> listOfSequences) private void addSequencesFromCache(ArrayList<Pair<DateTime, Sequence<T>>> listOfSequences)
@ -332,7 +339,9 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
if (!server.isAssignable() || !populateCache || isBySegment) { if (!server.isAssignable() || !populateCache || isBySegment) {
resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec)); resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec));
} else { } else {
resultSeqToAdd = toolChest.mergeSequences( // this could be more efficient, since we only need to reorder results
// for batches of segments with the same segment start time.
resultSeqToAdd = toolChest.mergeSequencesUnordered(
Sequences.map( Sequences.map(
clientQueryable.run(rewrittenQuery.withQuerySegmentSpec(segmentSpec)), clientQueryable.run(rewrittenQuery.withQuerySegmentSpec(segmentSpec)),
new Function<Object, Sequence<T>>() new Function<Object, Sequence<T>>()

View File

@ -309,6 +309,63 @@ public class CachingClusteredClientTest
); );
} }
@Test
public void testTimeseriesMergingOutOfOrderPartitions() throws Exception
{
final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(SEG_SPEC)
.filters(DIM_FILTER)
.granularity(GRANULARITY)
.aggregators(AGGS)
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(new QueryConfig()));
testQueryCaching(
runner,
builder.build(),
new Interval("2011-01-05/2011-01-10"),
makeTimeResults(
new DateTime("2011-01-05T02"), 80, 100,
new DateTime("2011-01-06T02"), 420, 520,
new DateTime("2011-01-07T02"), 12, 2194,
new DateTime("2011-01-08T02"), 59, 201,
new DateTime("2011-01-09T02"), 181, 52
),
new Interval("2011-01-05/2011-01-10"),
makeTimeResults(
new DateTime("2011-01-05T00"), 85, 102,
new DateTime("2011-01-06T00"), 412, 521,
new DateTime("2011-01-07T00"), 122, 21894,
new DateTime("2011-01-08T00"), 5, 20,
new DateTime("2011-01-09T00"), 18, 521
)
);
TestHelper.assertExpectedResults(
makeRenamedTimeResults(
new DateTime("2011-01-05T00"), 85, 102,
new DateTime("2011-01-05T02"), 80, 100,
new DateTime("2011-01-06T00"), 412, 521,
new DateTime("2011-01-06T02"), 420, 520,
new DateTime("2011-01-07T00"), 122, 21894,
new DateTime("2011-01-07T02"), 12, 2194,
new DateTime("2011-01-08T00"), 5, 20,
new DateTime("2011-01-08T02"), 59, 201,
new DateTime("2011-01-09T00"), 18, 521,
new DateTime("2011-01-09T02"), 181, 52
),
runner.run(
builder.intervals("2011-01-05/2011-01-10")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.build()
)
);
}
@Test @Test
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void testTimeseriesCachingTimeZone() throws Exception public void testTimeseriesCachingTimeZone() throws Exception