make populateUncoveredIntervals a configuration in query context

This commit is contained in:
Himanshu Gupta 2016-01-25 14:09:16 -06:00
parent 1b2a568285
commit 3719b6e3c8
3 changed files with 66 additions and 31 deletions

View File

@ -62,6 +62,11 @@ public abstract class BaseQuery<T extends Comparable<T>> implements Query<T>
return parseBoolean(query, "finalize", defaultValue); return parseBoolean(query, "finalize", defaultValue);
} }
public static <T> int getContextUncoveredIntervalsLimit(Query<T> query, int defaultValue)
{
return parseInt(query, "uncoveredIntervalsLimit", defaultValue);
}
private static <T> int parseInt(Query<T> query, String key, int defaultValue) private static <T> int parseInt(Query<T> query, String key, int defaultValue)
{ {
Object val = query.getContextValue(key); Object val = query.getContextValue(key);

View File

@ -166,33 +166,54 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
Set<Pair<ServerSelector, SegmentDescriptor>> segments = Sets.newLinkedHashSet(); Set<Pair<ServerSelector, SegmentDescriptor>> segments = Sets.newLinkedHashSet();
List<TimelineObjectHolder<String, ServerSelector>> serversLookup = Lists.newLinkedList(); List<TimelineObjectHolder<String, ServerSelector>> serversLookup = Lists.newLinkedList();
List<Interval> uncoveredIntervals = Lists.newLinkedList();
for (Interval interval : query.getIntervals()) { // Note that enabling this leads to putting uncovered intervals information in the response headers
Iterable<TimelineObjectHolder<String, ServerSelector>> lookup = timeline.lookup(interval); // and might blow up in some cases https://github.com/druid-io/druid/issues/2108
long startMillis = interval.getStartMillis(); int uncoveredIntervalsLimit = BaseQuery.getContextUncoveredIntervalsLimit(query, 0);
long endMillis = interval.getEndMillis();
for (TimelineObjectHolder<String, ServerSelector> holder : lookup) { if (uncoveredIntervalsLimit > 0) {
Interval holderInterval = holder.getInterval(); List<Interval> uncoveredIntervals = Lists.newArrayListWithCapacity(uncoveredIntervalsLimit);
long intervalStart = holderInterval.getStartMillis(); boolean uncoveredIntervalsOverflowed = false;
if (startMillis != intervalStart) {
uncoveredIntervals.add(new Interval(startMillis, intervalStart)); for (Interval interval : query.getIntervals()) {
Iterable<TimelineObjectHolder<String, ServerSelector>> lookup = timeline.lookup(interval);
long startMillis = interval.getStartMillis();
long endMillis = interval.getEndMillis();
for (TimelineObjectHolder<String, ServerSelector> holder : lookup) {
Interval holderInterval = holder.getInterval();
long intervalStart = holderInterval.getStartMillis();
if (!uncoveredIntervalsOverflowed && startMillis != intervalStart) {
if (uncoveredIntervalsLimit > uncoveredIntervals.size()) {
uncoveredIntervals.add(new Interval(startMillis, intervalStart));
} else {
uncoveredIntervalsOverflowed = true;
}
}
startMillis = holderInterval.getEndMillis();
serversLookup.add(holder);
}
if (!uncoveredIntervalsOverflowed && startMillis < endMillis) {
if (uncoveredIntervalsLimit > uncoveredIntervals.size()) {
uncoveredIntervals.add(new Interval(startMillis, endMillis));
} else {
uncoveredIntervalsOverflowed = true;
}
} }
startMillis = holderInterval.getEndMillis();
serversLookup.add(holder);
} }
if (startMillis < endMillis) { if (!uncoveredIntervals.isEmpty()) {
uncoveredIntervals.add(new Interval(startMillis, endMillis)); // This returns intervals for which NO segment is present.
// Which is not necessarily an indication that the data doesn't exist or is
// incomplete. The data could exist and just not be loaded yet. In either
// case, though, this query will not include any data from the identified intervals.
responseContext.put("uncoveredIntervals", uncoveredIntervals);
responseContext.put("uncoveredIntervalsOverflowed", uncoveredIntervalsOverflowed);
}
} else {
for (Interval interval : query.getIntervals()) {
Iterables.addAll(serversLookup, timeline.lookup(interval));
} }
}
if (!uncoveredIntervals.isEmpty()) {
// This returns intervals for which NO segment is present.
// Which is not necessarily an indication that the data doesn't exist or is
// incomplete. The data could exist and just not be loaded yet. In either
// case, though, this query will not include any data from the identified intervals.
responseContext.put("uncoveredIntervals", uncoveredIntervals);
} }
// Let tool chest filter out unneeded segments // Let tool chest filter out unneeded segments

View File

@ -18,6 +18,7 @@
*/ */
package io.druid.client; package io.druid.client;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningExecutorService;
@ -76,12 +77,14 @@ public class CachingClusteredClientFunctionalityTest {
public void testUncoveredInterval() throws Exception { public void testUncoveredInterval() throws Exception {
addToTimeline(new Interval("2015-01-02/2015-01-03"), "1"); addToTimeline(new Interval("2015-01-02/2015-01-03"), "1");
addToTimeline(new Interval("2015-01-04/2015-01-05"), "1"); addToTimeline(new Interval("2015-01-04/2015-01-05"), "1");
addToTimeline(new Interval("2015-02-04/2015-02-05"), "1");
final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder()
.dataSource("test") .dataSource("test")
.intervals("2015-01-02/2015-01-03") .intervals("2015-01-02/2015-01-03")
.granularity("day") .granularity("day")
.aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("rows"))); .aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("rows")))
.context(ImmutableMap.<String, Object>of("uncoveredIntervalsLimit", 3));
Map<String, Object> responseContext = new HashMap<>(); Map<String, Object> responseContext = new HashMap<>();
client.run(builder.build(), responseContext); client.run(builder.build(), responseContext);
@ -90,45 +93,51 @@ public class CachingClusteredClientFunctionalityTest {
builder.intervals("2015-01-01/2015-01-03"); builder.intervals("2015-01-01/2015-01-03");
responseContext = new HashMap<>(); responseContext = new HashMap<>();
client.run(builder.build(), responseContext); client.run(builder.build(), responseContext);
assertUncovered(responseContext, "2015-01-01/2015-01-02"); assertUncovered(responseContext, false, "2015-01-01/2015-01-02");
builder.intervals("2015-01-01/2015-01-04"); builder.intervals("2015-01-01/2015-01-04");
responseContext = new HashMap<>(); responseContext = new HashMap<>();
client.run(builder.build(), responseContext); client.run(builder.build(), responseContext);
assertUncovered(responseContext, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04"); assertUncovered(responseContext, false, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04");
builder.intervals("2015-01-02/2015-01-04"); builder.intervals("2015-01-02/2015-01-04");
responseContext = new HashMap<>(); responseContext = new HashMap<>();
client.run(builder.build(), responseContext); client.run(builder.build(), responseContext);
assertUncovered(responseContext, "2015-01-03/2015-01-04"); assertUncovered(responseContext, false, "2015-01-03/2015-01-04");
builder.intervals("2015-01-01/2015-01-30"); builder.intervals("2015-01-01/2015-01-30");
responseContext = new HashMap<>(); responseContext = new HashMap<>();
client.run(builder.build(), responseContext); client.run(builder.build(), responseContext);
assertUncovered(responseContext, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04", "2015-01-05/2015-01-30"); assertUncovered(responseContext, false, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04", "2015-01-05/2015-01-30");
builder.intervals("2015-01-02/2015-01-30"); builder.intervals("2015-01-02/2015-01-30");
responseContext = new HashMap<>(); responseContext = new HashMap<>();
client.run(builder.build(), responseContext); client.run(builder.build(), responseContext);
assertUncovered(responseContext, "2015-01-03/2015-01-04", "2015-01-05/2015-01-30"); assertUncovered(responseContext, false, "2015-01-03/2015-01-04", "2015-01-05/2015-01-30");
builder.intervals("2015-01-04/2015-01-30"); builder.intervals("2015-01-04/2015-01-30");
responseContext = new HashMap<>(); responseContext = new HashMap<>();
client.run(builder.build(), responseContext); client.run(builder.build(), responseContext);
assertUncovered(responseContext, "2015-01-05/2015-01-30"); assertUncovered(responseContext, false, "2015-01-05/2015-01-30");
builder.intervals("2015-01-10/2015-01-30"); builder.intervals("2015-01-10/2015-01-30");
responseContext = new HashMap<>(); responseContext = new HashMap<>();
client.run(builder.build(), responseContext); client.run(builder.build(), responseContext);
assertUncovered(responseContext, "2015-01-10/2015-01-30"); assertUncovered(responseContext, false, "2015-01-10/2015-01-30");
builder.intervals("2015-01-01/2015-02-25");
responseContext = new HashMap<>();
client.run(builder.build(), responseContext);
assertUncovered(responseContext, true, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04", "2015-01-05/2015-02-04");
} }
private void assertUncovered(Map<String, Object> context, String... intervals) { private void assertUncovered(Map<String, Object> context, boolean uncoveredIntervalsOverflowed, String... intervals) {
List<Interval> expectedList = Lists.newArrayListWithExpectedSize(intervals.length); List<Interval> expectedList = Lists.newArrayListWithExpectedSize(intervals.length);
for (String interval : intervals) { for (String interval : intervals) {
expectedList.add(new Interval(interval)); expectedList.add(new Interval(interval));
} }
Assert.assertEquals((Object) expectedList, context.get("uncoveredIntervals")); Assert.assertEquals((Object) expectedList, context.get("uncoveredIntervals"));
Assert.assertEquals(uncoveredIntervalsOverflowed, context.get("uncoveredIntervalsOverflowed"));
} }
private void addToTimeline(Interval interval, String version) { private void addToTimeline(Interval interval, String version) {