mirror of https://github.com/apache/druid.git
Merge pull request #2332 from himanshug/configurable_partial
make populateUncoveredIntervals a configuration in query context
This commit is contained in:
commit
3880f54b87
|
@ -62,6 +62,11 @@ public abstract class BaseQuery<T extends Comparable<T>> implements Query<T>
|
||||||
return parseBoolean(query, "finalize", defaultValue);
|
return parseBoolean(query, "finalize", defaultValue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static <T> int getContextUncoveredIntervalsLimit(Query<T> query, int defaultValue)
|
||||||
|
{
|
||||||
|
return parseInt(query, "uncoveredIntervalsLimit", defaultValue);
|
||||||
|
}
|
||||||
|
|
||||||
private static <T> int parseInt(Query<T> query, String key, int defaultValue)
|
private static <T> int parseInt(Query<T> query, String key, int defaultValue)
|
||||||
{
|
{
|
||||||
Object val = query.getContextValue(key);
|
Object val = query.getContextValue(key);
|
||||||
|
|
|
@ -166,33 +166,54 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
||||||
Set<Pair<ServerSelector, SegmentDescriptor>> segments = Sets.newLinkedHashSet();
|
Set<Pair<ServerSelector, SegmentDescriptor>> segments = Sets.newLinkedHashSet();
|
||||||
|
|
||||||
List<TimelineObjectHolder<String, ServerSelector>> serversLookup = Lists.newLinkedList();
|
List<TimelineObjectHolder<String, ServerSelector>> serversLookup = Lists.newLinkedList();
|
||||||
List<Interval> uncoveredIntervals = Lists.newLinkedList();
|
|
||||||
|
|
||||||
for (Interval interval : query.getIntervals()) {
|
// Note that enabling this leads to putting uncovered intervals information in the response headers
|
||||||
Iterable<TimelineObjectHolder<String, ServerSelector>> lookup = timeline.lookup(interval);
|
// and might blow up in some cases https://github.com/druid-io/druid/issues/2108
|
||||||
long startMillis = interval.getStartMillis();
|
int uncoveredIntervalsLimit = BaseQuery.getContextUncoveredIntervalsLimit(query, 0);
|
||||||
long endMillis = interval.getEndMillis();
|
|
||||||
for (TimelineObjectHolder<String, ServerSelector> holder : lookup) {
|
if (uncoveredIntervalsLimit > 0) {
|
||||||
Interval holderInterval = holder.getInterval();
|
List<Interval> uncoveredIntervals = Lists.newArrayListWithCapacity(uncoveredIntervalsLimit);
|
||||||
long intervalStart = holderInterval.getStartMillis();
|
boolean uncoveredIntervalsOverflowed = false;
|
||||||
if (startMillis != intervalStart) {
|
|
||||||
uncoveredIntervals.add(new Interval(startMillis, intervalStart));
|
for (Interval interval : query.getIntervals()) {
|
||||||
|
Iterable<TimelineObjectHolder<String, ServerSelector>> lookup = timeline.lookup(interval);
|
||||||
|
long startMillis = interval.getStartMillis();
|
||||||
|
long endMillis = interval.getEndMillis();
|
||||||
|
for (TimelineObjectHolder<String, ServerSelector> holder : lookup) {
|
||||||
|
Interval holderInterval = holder.getInterval();
|
||||||
|
long intervalStart = holderInterval.getStartMillis();
|
||||||
|
if (!uncoveredIntervalsOverflowed && startMillis != intervalStart) {
|
||||||
|
if (uncoveredIntervalsLimit > uncoveredIntervals.size()) {
|
||||||
|
uncoveredIntervals.add(new Interval(startMillis, intervalStart));
|
||||||
|
} else {
|
||||||
|
uncoveredIntervalsOverflowed = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
startMillis = holderInterval.getEndMillis();
|
||||||
|
serversLookup.add(holder);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!uncoveredIntervalsOverflowed && startMillis < endMillis) {
|
||||||
|
if (uncoveredIntervalsLimit > uncoveredIntervals.size()) {
|
||||||
|
uncoveredIntervals.add(new Interval(startMillis, endMillis));
|
||||||
|
} else {
|
||||||
|
uncoveredIntervalsOverflowed = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
startMillis = holderInterval.getEndMillis();
|
|
||||||
serversLookup.add(holder);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (startMillis < endMillis) {
|
if (!uncoveredIntervals.isEmpty()) {
|
||||||
uncoveredIntervals.add(new Interval(startMillis, endMillis));
|
// This returns intervals for which NO segment is present.
|
||||||
|
// Which is not necessarily an indication that the data doesn't exist or is
|
||||||
|
// incomplete. The data could exist and just not be loaded yet. In either
|
||||||
|
// case, though, this query will not include any data from the identified intervals.
|
||||||
|
responseContext.put("uncoveredIntervals", uncoveredIntervals);
|
||||||
|
responseContext.put("uncoveredIntervalsOverflowed", uncoveredIntervalsOverflowed);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for (Interval interval : query.getIntervals()) {
|
||||||
|
Iterables.addAll(serversLookup, timeline.lookup(interval));
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if (!uncoveredIntervals.isEmpty()) {
|
|
||||||
// This returns intervals for which NO segment is present.
|
|
||||||
// Which is not necessarily an indication that the data doesn't exist or is
|
|
||||||
// incomplete. The data could exist and just not be loaded yet. In either
|
|
||||||
// case, though, this query will not include any data from the identified intervals.
|
|
||||||
responseContext.put("uncoveredIntervals", uncoveredIntervals);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Let tool chest filter out unneeded segments
|
// Let tool chest filter out unneeded segments
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
*/
|
*/
|
||||||
package io.druid.client;
|
package io.druid.client;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Ordering;
|
import com.google.common.collect.Ordering;
|
||||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||||
|
@ -76,12 +77,14 @@ public class CachingClusteredClientFunctionalityTest {
|
||||||
public void testUncoveredInterval() throws Exception {
|
public void testUncoveredInterval() throws Exception {
|
||||||
addToTimeline(new Interval("2015-01-02/2015-01-03"), "1");
|
addToTimeline(new Interval("2015-01-02/2015-01-03"), "1");
|
||||||
addToTimeline(new Interval("2015-01-04/2015-01-05"), "1");
|
addToTimeline(new Interval("2015-01-04/2015-01-05"), "1");
|
||||||
|
addToTimeline(new Interval("2015-02-04/2015-02-05"), "1");
|
||||||
|
|
||||||
final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder()
|
final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder()
|
||||||
.dataSource("test")
|
.dataSource("test")
|
||||||
.intervals("2015-01-02/2015-01-03")
|
.intervals("2015-01-02/2015-01-03")
|
||||||
.granularity("day")
|
.granularity("day")
|
||||||
.aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("rows")));
|
.aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("rows")))
|
||||||
|
.context(ImmutableMap.<String, Object>of("uncoveredIntervalsLimit", 3));
|
||||||
|
|
||||||
Map<String, Object> responseContext = new HashMap<>();
|
Map<String, Object> responseContext = new HashMap<>();
|
||||||
client.run(builder.build(), responseContext);
|
client.run(builder.build(), responseContext);
|
||||||
|
@ -90,45 +93,51 @@ public class CachingClusteredClientFunctionalityTest {
|
||||||
builder.intervals("2015-01-01/2015-01-03");
|
builder.intervals("2015-01-01/2015-01-03");
|
||||||
responseContext = new HashMap<>();
|
responseContext = new HashMap<>();
|
||||||
client.run(builder.build(), responseContext);
|
client.run(builder.build(), responseContext);
|
||||||
assertUncovered(responseContext, "2015-01-01/2015-01-02");
|
assertUncovered(responseContext, false, "2015-01-01/2015-01-02");
|
||||||
|
|
||||||
builder.intervals("2015-01-01/2015-01-04");
|
builder.intervals("2015-01-01/2015-01-04");
|
||||||
responseContext = new HashMap<>();
|
responseContext = new HashMap<>();
|
||||||
client.run(builder.build(), responseContext);
|
client.run(builder.build(), responseContext);
|
||||||
assertUncovered(responseContext, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04");
|
assertUncovered(responseContext, false, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04");
|
||||||
|
|
||||||
builder.intervals("2015-01-02/2015-01-04");
|
builder.intervals("2015-01-02/2015-01-04");
|
||||||
responseContext = new HashMap<>();
|
responseContext = new HashMap<>();
|
||||||
client.run(builder.build(), responseContext);
|
client.run(builder.build(), responseContext);
|
||||||
assertUncovered(responseContext, "2015-01-03/2015-01-04");
|
assertUncovered(responseContext, false, "2015-01-03/2015-01-04");
|
||||||
|
|
||||||
builder.intervals("2015-01-01/2015-01-30");
|
builder.intervals("2015-01-01/2015-01-30");
|
||||||
responseContext = new HashMap<>();
|
responseContext = new HashMap<>();
|
||||||
client.run(builder.build(), responseContext);
|
client.run(builder.build(), responseContext);
|
||||||
assertUncovered(responseContext, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04", "2015-01-05/2015-01-30");
|
assertUncovered(responseContext, false, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04", "2015-01-05/2015-01-30");
|
||||||
|
|
||||||
builder.intervals("2015-01-02/2015-01-30");
|
builder.intervals("2015-01-02/2015-01-30");
|
||||||
responseContext = new HashMap<>();
|
responseContext = new HashMap<>();
|
||||||
client.run(builder.build(), responseContext);
|
client.run(builder.build(), responseContext);
|
||||||
assertUncovered(responseContext, "2015-01-03/2015-01-04", "2015-01-05/2015-01-30");
|
assertUncovered(responseContext, false, "2015-01-03/2015-01-04", "2015-01-05/2015-01-30");
|
||||||
|
|
||||||
builder.intervals("2015-01-04/2015-01-30");
|
builder.intervals("2015-01-04/2015-01-30");
|
||||||
responseContext = new HashMap<>();
|
responseContext = new HashMap<>();
|
||||||
client.run(builder.build(), responseContext);
|
client.run(builder.build(), responseContext);
|
||||||
assertUncovered(responseContext, "2015-01-05/2015-01-30");
|
assertUncovered(responseContext, false, "2015-01-05/2015-01-30");
|
||||||
|
|
||||||
builder.intervals("2015-01-10/2015-01-30");
|
builder.intervals("2015-01-10/2015-01-30");
|
||||||
responseContext = new HashMap<>();
|
responseContext = new HashMap<>();
|
||||||
client.run(builder.build(), responseContext);
|
client.run(builder.build(), responseContext);
|
||||||
assertUncovered(responseContext, "2015-01-10/2015-01-30");
|
assertUncovered(responseContext, false, "2015-01-10/2015-01-30");
|
||||||
|
|
||||||
|
builder.intervals("2015-01-01/2015-02-25");
|
||||||
|
responseContext = new HashMap<>();
|
||||||
|
client.run(builder.build(), responseContext);
|
||||||
|
assertUncovered(responseContext, true, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04", "2015-01-05/2015-02-04");
|
||||||
}
|
}
|
||||||
|
|
||||||
private void assertUncovered(Map<String, Object> context, String... intervals) {
|
private void assertUncovered(Map<String, Object> context, boolean uncoveredIntervalsOverflowed, String... intervals) {
|
||||||
List<Interval> expectedList = Lists.newArrayListWithExpectedSize(intervals.length);
|
List<Interval> expectedList = Lists.newArrayListWithExpectedSize(intervals.length);
|
||||||
for (String interval : intervals) {
|
for (String interval : intervals) {
|
||||||
expectedList.add(new Interval(interval));
|
expectedList.add(new Interval(interval));
|
||||||
}
|
}
|
||||||
Assert.assertEquals((Object) expectedList, context.get("uncoveredIntervals"));
|
Assert.assertEquals((Object) expectedList, context.get("uncoveredIntervals"));
|
||||||
|
Assert.assertEquals(uncoveredIntervalsOverflowed, context.get("uncoveredIntervalsOverflowed"));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addToTimeline(Interval interval, String version) {
|
private void addToTimeline(Interval interval, String version) {
|
||||||
|
|
Loading…
Reference in New Issue