fix ordering for partitions with same start time, fixes #796

Partitions with same start time may produce results out of order,
           especially in the presence of data gaps.

Results for those segments must be re-ordered prior to being merged
This commit is contained in:
Xavier Léauté 2014-10-20 16:23:45 -07:00
parent 8ec8952993
commit 23a59db566
2 changed files with 75 additions and 9 deletions

View File

@ -25,7 +25,6 @@ import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -261,14 +260,22 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
Ordering.natural().onResultOf(Pair.<DateTime, Sequence<T>>lhsFn())
);
final Sequence<Sequence<T>> seq = Sequences.simple(
Iterables.transform(listOfSequences, Pair.<DateTime, Sequence<T>>rhsFn())
);
if (strategy == null) {
return toolChest.mergeSequences(seq);
} else {
return strategy.mergeSequences(seq);
final List<Sequence<T>> orderedSequences = Lists.newLinkedList();
DateTime unorderedStart = null;
List<Sequence<T>> unordered = Lists.newLinkedList();
for (Pair<DateTime, Sequence<T>> sequencePair : listOfSequences) {
if (unorderedStart != null && unorderedStart.getMillis() != sequencePair.lhs.getMillis()) {
orderedSequences.add(toolChest.mergeSequencesUnordered(Sequences.simple(unordered)));
unordered = Lists.newLinkedList();
}
unorderedStart = sequencePair.lhs;
unordered.add(sequencePair.rhs);
}
if(!unordered.isEmpty()) {
orderedSequences.add(toolChest.mergeSequencesUnordered(Sequences.simple(unordered)));
}
return toolChest.mergeSequences(Sequences.simple(orderedSequences));
}
private void addSequencesFromCache(ArrayList<Pair<DateTime, Sequence<T>>> listOfSequences)
@ -332,7 +339,9 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
if (!server.isAssignable() || !populateCache || isBySegment) {
resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec));
} else {
resultSeqToAdd = toolChest.mergeSequences(
// this could be more efficient, since we only need to reorder results
// for batches of segments with the same segment start time.
resultSeqToAdd = toolChest.mergeSequencesUnordered(
Sequences.map(
clientQueryable.run(rewrittenQuery.withQuerySegmentSpec(segmentSpec)),
new Function<Object, Sequence<T>>()

View File

@ -309,6 +309,63 @@ public class CachingClusteredClientTest
);
}
@Test
public void testTimeseriesMergingOutOfOrderPartitions() throws Exception
{
final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(SEG_SPEC)
.filters(DIM_FILTER)
.granularity(GRANULARITY)
.aggregators(AGGS)
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(new QueryConfig()));
testQueryCaching(
runner,
builder.build(),
new Interval("2011-01-05/2011-01-10"),
makeTimeResults(
new DateTime("2011-01-05T02"), 80, 100,
new DateTime("2011-01-06T02"), 420, 520,
new DateTime("2011-01-07T02"), 12, 2194,
new DateTime("2011-01-08T02"), 59, 201,
new DateTime("2011-01-09T02"), 181, 52
),
new Interval("2011-01-05/2011-01-10"),
makeTimeResults(
new DateTime("2011-01-05T00"), 85, 102,
new DateTime("2011-01-06T00"), 412, 521,
new DateTime("2011-01-07T00"), 122, 21894,
new DateTime("2011-01-08T00"), 5, 20,
new DateTime("2011-01-09T00"), 18, 521
)
);
TestHelper.assertExpectedResults(
makeRenamedTimeResults(
new DateTime("2011-01-05T00"), 85, 102,
new DateTime("2011-01-05T02"), 80, 100,
new DateTime("2011-01-06T00"), 412, 521,
new DateTime("2011-01-06T02"), 420, 520,
new DateTime("2011-01-07T00"), 122, 21894,
new DateTime("2011-01-07T02"), 12, 2194,
new DateTime("2011-01-08T00"), 5, 20,
new DateTime("2011-01-08T02"), 59, 201,
new DateTime("2011-01-09T00"), 18, 521,
new DateTime("2011-01-09T02"), 181, 52
),
runner.run(
builder.intervals("2011-01-05/2011-01-10")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.build()
)
);
}
@Test
@SuppressWarnings("unchecked")
public void testTimeseriesCachingTimeZone() throws Exception