From 3719b6e3c8e57b55847f230952230d8201d4c614 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Mon, 25 Jan 2016 14:09:16 -0600 Subject: [PATCH] make populateUncoveredIntervals a configuration in query context --- .../main/java/io/druid/query/BaseQuery.java | 5 ++ .../druid/client/CachingClusteredClient.java | 65 ++++++++++++------- ...chingClusteredClientFunctionalityTest.java | 27 +++++--- 3 files changed, 66 insertions(+), 31 deletions(-) diff --git a/processing/src/main/java/io/druid/query/BaseQuery.java b/processing/src/main/java/io/druid/query/BaseQuery.java index 0f95e1bdbba..9a8629b50ad 100644 --- a/processing/src/main/java/io/druid/query/BaseQuery.java +++ b/processing/src/main/java/io/druid/query/BaseQuery.java @@ -62,6 +62,11 @@ public abstract class BaseQuery> implements Query return parseBoolean(query, "finalize", defaultValue); } + public static int getContextUncoveredIntervalsLimit(Query query, int defaultValue) + { + return parseInt(query, "uncoveredIntervalsLimit", defaultValue); + } + private static int parseInt(Query query, String key, int defaultValue) { Object val = query.getContextValue(key); diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index fddc482dfe1..4e962a2f777 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -166,33 +166,54 @@ public class CachingClusteredClient implements QueryRunner Set> segments = Sets.newLinkedHashSet(); List> serversLookup = Lists.newLinkedList(); - List uncoveredIntervals = Lists.newLinkedList(); - for (Interval interval : query.getIntervals()) { - Iterable> lookup = timeline.lookup(interval); - long startMillis = interval.getStartMillis(); - long endMillis = interval.getEndMillis(); - for (TimelineObjectHolder holder : lookup) { - Interval holderInterval = holder.getInterval(); - long intervalStart = holderInterval.getStartMillis(); - if (startMillis != intervalStart) { - uncoveredIntervals.add(new Interval(startMillis, intervalStart)); + // Note that enabling this leads to putting uncovered intervals information in the response headers + // and might blow up in some cases https://github.com/druid-io/druid/issues/2108 + int uncoveredIntervalsLimit = BaseQuery.getContextUncoveredIntervalsLimit(query, 0); + + if (uncoveredIntervalsLimit > 0) { + List uncoveredIntervals = Lists.newArrayListWithCapacity(uncoveredIntervalsLimit); + boolean uncoveredIntervalsOverflowed = false; + + for (Interval interval : query.getIntervals()) { + Iterable> lookup = timeline.lookup(interval); + long startMillis = interval.getStartMillis(); + long endMillis = interval.getEndMillis(); + for (TimelineObjectHolder holder : lookup) { + Interval holderInterval = holder.getInterval(); + long intervalStart = holderInterval.getStartMillis(); + if (!uncoveredIntervalsOverflowed && startMillis != intervalStart) { + if (uncoveredIntervalsLimit > uncoveredIntervals.size()) { + uncoveredIntervals.add(new Interval(startMillis, intervalStart)); + } else { + uncoveredIntervalsOverflowed = true; + } + } + startMillis = holderInterval.getEndMillis(); + serversLookup.add(holder); + } + + if (!uncoveredIntervalsOverflowed && startMillis < endMillis) { + if (uncoveredIntervalsLimit > uncoveredIntervals.size()) { + uncoveredIntervals.add(new Interval(startMillis, endMillis)); + } else { + uncoveredIntervalsOverflowed = true; + } } - startMillis = holderInterval.getEndMillis(); - serversLookup.add(holder); } - if (startMillis < endMillis) { - uncoveredIntervals.add(new Interval(startMillis, endMillis)); + if (!uncoveredIntervals.isEmpty()) { + // This returns intervals for which NO segment is present. + // Which is not necessarily an indication that the data doesn't exist or is + // incomplete. The data could exist and just not be loaded yet. In either + // case, though, this query will not include any data from the identified intervals. + responseContext.put("uncoveredIntervals", uncoveredIntervals); + responseContext.put("uncoveredIntervalsOverflowed", uncoveredIntervalsOverflowed); + } + } else { + for (Interval interval : query.getIntervals()) { + Iterables.addAll(serversLookup, timeline.lookup(interval)); } - } - - if (!uncoveredIntervals.isEmpty()) { - // This returns intervals for which NO segment is present. - // Which is not necessarily an indication that the data doesn't exist or is - // incomplete. The data could exist and just not be loaded yet. In either - // case, though, this query will not include any data from the identified intervals. - responseContext.put("uncoveredIntervals", uncoveredIntervals); } // Let tool chest filter out unneeded segments diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java index 7688514f3e5..99d30caec6b 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java @@ -18,6 +18,7 @@ */ package io.druid.client; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.google.common.util.concurrent.ListeningExecutorService; @@ -76,12 +77,14 @@ public class CachingClusteredClientFunctionalityTest { public void testUncoveredInterval() throws Exception { addToTimeline(new Interval("2015-01-02/2015-01-03"), "1"); addToTimeline(new Interval("2015-01-04/2015-01-05"), "1"); + addToTimeline(new Interval("2015-02-04/2015-02-05"), "1"); final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() .dataSource("test") .intervals("2015-01-02/2015-01-03") .granularity("day") - .aggregators(Arrays.asList(new CountAggregatorFactory("rows"))); + .aggregators(Arrays.asList(new CountAggregatorFactory("rows"))) + .context(ImmutableMap.of("uncoveredIntervalsLimit", 3)); Map responseContext = new HashMap<>(); client.run(builder.build(), responseContext); @@ -90,45 +93,51 @@ public class CachingClusteredClientFunctionalityTest { builder.intervals("2015-01-01/2015-01-03"); responseContext = new HashMap<>(); client.run(builder.build(), responseContext); - assertUncovered(responseContext, "2015-01-01/2015-01-02"); + assertUncovered(responseContext, false, "2015-01-01/2015-01-02"); builder.intervals("2015-01-01/2015-01-04"); responseContext = new HashMap<>(); client.run(builder.build(), responseContext); - assertUncovered(responseContext, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04"); + assertUncovered(responseContext, false, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04"); builder.intervals("2015-01-02/2015-01-04"); responseContext = new HashMap<>(); client.run(builder.build(), responseContext); - assertUncovered(responseContext, "2015-01-03/2015-01-04"); + assertUncovered(responseContext, false, "2015-01-03/2015-01-04"); builder.intervals("2015-01-01/2015-01-30"); responseContext = new HashMap<>(); client.run(builder.build(), responseContext); - assertUncovered(responseContext, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04", "2015-01-05/2015-01-30"); + assertUncovered(responseContext, false, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04", "2015-01-05/2015-01-30"); builder.intervals("2015-01-02/2015-01-30"); responseContext = new HashMap<>(); client.run(builder.build(), responseContext); - assertUncovered(responseContext, "2015-01-03/2015-01-04", "2015-01-05/2015-01-30"); + assertUncovered(responseContext, false, "2015-01-03/2015-01-04", "2015-01-05/2015-01-30"); builder.intervals("2015-01-04/2015-01-30"); responseContext = new HashMap<>(); client.run(builder.build(), responseContext); - assertUncovered(responseContext, "2015-01-05/2015-01-30"); + assertUncovered(responseContext, false, "2015-01-05/2015-01-30"); builder.intervals("2015-01-10/2015-01-30"); responseContext = new HashMap<>(); client.run(builder.build(), responseContext); - assertUncovered(responseContext, "2015-01-10/2015-01-30"); + assertUncovered(responseContext, false, "2015-01-10/2015-01-30"); + + builder.intervals("2015-01-01/2015-02-25"); + responseContext = new HashMap<>(); + client.run(builder.build(), responseContext); + assertUncovered(responseContext, true, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04", "2015-01-05/2015-02-04"); } - private void assertUncovered(Map context, String... intervals) { + private void assertUncovered(Map context, boolean uncoveredIntervalsOverflowed, String... intervals) { List expectedList = Lists.newArrayListWithExpectedSize(intervals.length); for (String interval : intervals) { expectedList.add(new Interval(interval)); } Assert.assertEquals((Object) expectedList, context.get("uncoveredIntervals")); + Assert.assertEquals(uncoveredIntervalsOverflowed, context.get("uncoveredIntervalsOverflowed")); } private void addToTimeline(Interval interval, String version) {