From 23a59db566bc054b3231f7c027239f0239bdaebb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Mon, 20 Oct 2014 16:23:45 -0700 Subject: [PATCH] fix ordering for partitions with same start time, fixes #796 Partitions with same start time may produce results out of order, especially in the presence of data gaps. Results for those segments must be re-ordered prior to being merged --- .../druid/client/CachingClusteredClient.java | 27 ++++++--- .../client/CachingClusteredClientTest.java | 57 +++++++++++++++++++ 2 files changed, 75 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index d0135eb7f4a..f189e060670 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -25,7 +25,6 @@ import com.google.common.base.Function; import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -261,14 +260,22 @@ public class CachingClusteredClient implements QueryRunner Ordering.natural().onResultOf(Pair.>lhsFn()) ); - final Sequence> seq = Sequences.simple( - Iterables.transform(listOfSequences, Pair.>rhsFn()) - ); - if (strategy == null) { - return toolChest.mergeSequences(seq); - } else { - return strategy.mergeSequences(seq); + final List> orderedSequences = Lists.newLinkedList(); + DateTime unorderedStart = null; + List> unordered = Lists.newLinkedList(); + for (Pair> sequencePair : listOfSequences) { + if (unorderedStart != null && unorderedStart.getMillis() != sequencePair.lhs.getMillis()) { + orderedSequences.add(toolChest.mergeSequencesUnordered(Sequences.simple(unordered))); + unordered = Lists.newLinkedList(); + } + unorderedStart = sequencePair.lhs; + unordered.add(sequencePair.rhs); } + if(!unordered.isEmpty()) { + orderedSequences.add(toolChest.mergeSequencesUnordered(Sequences.simple(unordered))); + } + + return toolChest.mergeSequences(Sequences.simple(orderedSequences)); } private void addSequencesFromCache(ArrayList>> listOfSequences) @@ -332,7 +339,9 @@ public class CachingClusteredClient implements QueryRunner if (!server.isAssignable() || !populateCache || isBySegment) { resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec)); } else { - resultSeqToAdd = toolChest.mergeSequences( + // this could be more efficient, since we only need to reorder results + // for batches of segments with the same segment start time. + resultSeqToAdd = toolChest.mergeSequencesUnordered( Sequences.map( clientQueryable.run(rewrittenQuery.withQuerySegmentSpec(segmentSpec)), new Function>() diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index f99f3d4346b..c59da0c9906 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -309,6 +309,63 @@ public class CachingClusteredClientTest ); } + @Test + public void testTimeseriesMergingOutOfOrderPartitions() throws Exception + { + final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() + .dataSource(DATA_SOURCE) + .intervals(SEG_SPEC) + .filters(DIM_FILTER) + .granularity(GRANULARITY) + .aggregators(AGGS) + .postAggregators(POST_AGGS) + .context(CONTEXT); + + QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(new QueryConfig())); + + testQueryCaching( + runner, + builder.build(), + new Interval("2011-01-05/2011-01-10"), + makeTimeResults( + new DateTime("2011-01-05T02"), 80, 100, + new DateTime("2011-01-06T02"), 420, 520, + new DateTime("2011-01-07T02"), 12, 2194, + new DateTime("2011-01-08T02"), 59, 201, + new DateTime("2011-01-09T02"), 181, 52 + ), + new Interval("2011-01-05/2011-01-10"), + makeTimeResults( + new DateTime("2011-01-05T00"), 85, 102, + new DateTime("2011-01-06T00"), 412, 521, + new DateTime("2011-01-07T00"), 122, 21894, + new DateTime("2011-01-08T00"), 5, 20, + new DateTime("2011-01-09T00"), 18, 521 + ) + ); + + TestHelper.assertExpectedResults( + makeRenamedTimeResults( + new DateTime("2011-01-05T00"), 85, 102, + new DateTime("2011-01-05T02"), 80, 100, + new DateTime("2011-01-06T00"), 412, 521, + new DateTime("2011-01-06T02"), 420, 520, + new DateTime("2011-01-07T00"), 122, 21894, + new DateTime("2011-01-07T02"), 12, 2194, + new DateTime("2011-01-08T00"), 5, 20, + new DateTime("2011-01-08T02"), 59, 201, + new DateTime("2011-01-09T00"), 18, 521, + new DateTime("2011-01-09T02"), 181, 52 + ), + runner.run( + builder.intervals("2011-01-05/2011-01-10") + .aggregators(RENAMED_AGGS) + .postAggregators(RENAMED_POST_AGGS) + .build() + ) + ); + } + @Test @SuppressWarnings("unchecked") public void testTimeseriesCachingTimeZone() throws Exception