Adding filters for TimeBoundary on backend (#3168)

* Adding filters for TimeBoundary on backend

Signed-off-by: Balachandar Kesavan <raj.ksvn@gmail.com>

* updating TimeBoundaryQuery constructor in QueryHostFinderTest

* add filter helpers

* update filterSegments + test

* Conditional filterSegment depending on whether a filter exists

* Style changes

* Trigger rebuild

* Adding documentation for timeboundaryquery filtering

* added filter serialization to timeboundaryquery cache

* code style changes
This commit is contained in:
rajk-tetration 2016-08-15 10:25:24 -07:00 committed by Fangjin Yang
parent 70d99fe3c6
commit 362b9266f8
8 changed files with 286 additions and 14 deletions

View File

@ -9,6 +9,7 @@ Time boundary queries return the earliest and latest data points of a data set.
"queryType" : "timeBoundary",
"dataSource": "sample_datasource",
"bound" : < "maxTime" | "minTime" > # optional, defaults to returning both timestamps if not set
"filter" : { "type": "and", "fields": [<filter>, <filter>, ...] } # optional
}
```
@ -19,6 +20,7 @@ There are 3 main parts to a time boundary query:
|queryType|This String should always be "timeBoundary"; this is the first thing Druid looks at to figure out how to interpret the query|yes|
|dataSource|A String or Object defining the data source to query, very similar to a table in a relational database. See [DataSource](../querying/datasource.html) for more information.|yes|
|bound | Optional, set to `maxTime` or `minTime` to return only the latest or earliest timestamp. Default to returning both if not set| no |
|filter|See [Filters](../querying/filters.html)|no|
|context|See [Context](../querying/query-context.html)|no|
The format of the result is:

View File

@ -775,6 +775,7 @@ public class Druids
private DataSource dataSource;
private QuerySegmentSpec querySegmentSpec;
private String bound;
private DimFilter dimFilter;
private Map<String, Object> context;
public TimeBoundaryQueryBuilder()
@ -782,6 +783,7 @@ public class Druids
dataSource = null;
querySegmentSpec = null;
bound = null;
dimFilter = null;
context = null;
}
@ -791,6 +793,7 @@ public class Druids
dataSource,
querySegmentSpec,
bound,
dimFilter,
context
);
}
@ -801,6 +804,7 @@ public class Druids
.dataSource(builder.dataSource)
.intervals(builder.querySegmentSpec)
.bound(builder.bound)
.filters(builder.dimFilter)
.context(builder.context);
}
@ -840,6 +844,24 @@ public class Druids
return this;
}
public TimeBoundaryQueryBuilder filters(String dimensionName, String value)
{
dimFilter = new SelectorDimFilter(dimensionName, value, null);
return this;
}
public TimeBoundaryQueryBuilder filters(String dimensionName, String value, String... values)
{
dimFilter = new InDimFilter(dimensionName, Lists.asList(value, values), null);
return this;
}
public TimeBoundaryQueryBuilder filters(DimFilter f)
{
dimFilter = f;
return this;
}
public TimeBoundaryQueryBuilder context(Map<String, Object> c)
{
context = c;

View File

@ -32,6 +32,7 @@ import io.druid.query.Result;
import io.druid.query.filter.DimFilter;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.query.spec.QuerySegmentSpec;
import io.druid.query.filter.DimFilter;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -53,6 +54,7 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
private static final byte CACHE_TYPE_ID = 0x0;
private final DimFilter dimFilter;
private final String bound;
@JsonCreator
@ -60,6 +62,7 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
@JsonProperty("dataSource") DataSource dataSource,
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
@JsonProperty("bound") String bound,
@JsonProperty("filter") DimFilter dimFilter,
@JsonProperty("context") Map<String, Object> context
)
{
@ -71,13 +74,13 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
context
);
this.dimFilter = dimFilter;
this.bound = bound == null ? "" : bound;
}
@Override
public boolean hasFilters()
{
return false;
public boolean hasFilters() {
return dimFilter != null;
}
@Override
@ -92,6 +95,12 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
return Query.TIME_BOUNDARY;
}
@JsonProperty("filter")
public DimFilter getDimensionsFilter()
{
return dimFilter;
}
@JsonProperty
public String getBound()
{
@ -105,6 +114,7 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
getDataSource(),
getQuerySegmentSpec(),
bound,
dimFilter,
computeOverridenContext(contextOverrides)
);
}
@ -116,6 +126,7 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
getDataSource(),
spec,
bound,
dimFilter,
getContext()
);
}
@ -127,16 +138,21 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
dataSource,
getQuerySegmentSpec(),
bound,
dimFilter,
getContext()
);
}
public byte[] getCacheKey()
{
final byte[] filterBytes = dimFilter == null ? new byte[]{} : dimFilter.getCacheKey();
final byte[] boundBytes = StringUtils.toUtf8(bound);
return ByteBuffer.allocate(1 + boundBytes.length)
final byte delimiter = (byte) 0xff;
return ByteBuffer.allocate(2 + boundBytes.length + filterBytes.length)
.put(CACHE_TYPE_ID)
.put(boundBytes)
.put(delimiter)
.put(filterBytes)
.array();
}
@ -218,6 +234,7 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
", querySegmentSpec=" + getQuerySegmentSpec() +
", duration=" + getDuration() +
", bound=" + bound +
", dimFilter=" + dimFilter +
'}';
}
@ -240,6 +257,10 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
return false;
}
if (dimFilter != null ? !dimFilter.equals(that.dimFilter) : that.dimFilter != null) {
return false;
}
return true;
}
@ -248,6 +269,7 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
{
int result = super.hashCode();
result = 31 * result + bound.hashCode();
result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0);
return result;
}
}

View File

@ -61,7 +61,7 @@ public class TimeBoundaryQueryQueryToolChest
@Override
public <T extends LogicalSegment> List<T> filterSegments(TimeBoundaryQuery query, List<T> segments)
{
if (segments.size() <= 1) {
if (segments.size() <= 1 || query.hasFilters()) {
return segments;
}

View File

@ -20,9 +20,12 @@
package io.druid.query.timeboundary;
import com.google.inject.Inject;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.query.ChainedExecutionQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
@ -30,14 +33,22 @@ import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryWatcher;
import io.druid.query.Result;
import io.druid.granularity.AllGranularity;
import io.druid.segment.Segment;
import io.druid.segment.StorageAdapter;
import io.druid.segment.filter.Filters;
import io.druid.query.QueryRunnerHelper;
import io.druid.segment.Cursor;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.column.Column;
import org.joda.time.DateTime;
import java.util.Iterator;
import java.util.Map;
import java.util.List;
import java.util.concurrent.ExecutorService;
/**
*/
public class TimeBoundaryQueryRunnerFactory
@ -75,10 +86,45 @@ public class TimeBoundaryQueryRunnerFactory
private static class TimeBoundaryQueryRunner implements QueryRunner<Result<TimeBoundaryResultValue>>
{
private final StorageAdapter adapter;
private final Function<Cursor, Result<DateTime>> skipToFirstMatching;
public TimeBoundaryQueryRunner(Segment segment)
{
this.adapter = segment.asStorageAdapter();
this.skipToFirstMatching = new Function<Cursor, Result<DateTime>>()
{
@Override
public Result<DateTime> apply(Cursor cursor)
{
if (cursor.isDone()) {
return null;
}
final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME);
final DateTime timestamp = new DateTime(timestampColumnSelector.get());
return new Result<>(adapter.getInterval().getStart(), timestamp);
}
};
}
private DateTime getTimeBoundary(StorageAdapter adapter, TimeBoundaryQuery legacyQuery, boolean descending)
{
final Sequence<Result<DateTime>> resultSequence = QueryRunnerHelper.makeCursorBasedQuery(
adapter,
legacyQuery.getQuerySegmentSpec().getIntervals(),
Filters.toFilter(legacyQuery.getDimensionsFilter()),
descending,
new AllGranularity(),
this.skipToFirstMatching
);
final List<Result<DateTime>> resultList = Sequences.toList(
Sequences.limit(resultSequence, 1),
Lists.<Result<DateTime>>newArrayList()
);
if (resultList.size() > 0) {
return resultList.get(0).getValue();
}
return null;
}
@Override
@ -104,14 +150,24 @@ public class TimeBoundaryQueryRunnerFactory
"Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
);
}
final DateTime minTime;
final DateTime maxTime;
final DateTime minTime = legacyQuery.getBound().equalsIgnoreCase(TimeBoundaryQuery.MAX_TIME)
? null
: adapter.getMinTime();
final DateTime maxTime = legacyQuery.getBound().equalsIgnoreCase(TimeBoundaryQuery.MIN_TIME)
? null
: adapter.getMaxTime();
if (legacyQuery.getDimensionsFilter() != null) {
minTime = getTimeBoundary(adapter, legacyQuery, false);
if (minTime == null) {
maxTime = null;
} else {
maxTime = getTimeBoundary(adapter, legacyQuery, true);
}
} else {
minTime = legacyQuery.getBound().equalsIgnoreCase(TimeBoundaryQuery.MAX_TIME)
? null
: adapter.getMinTime();
maxTime = legacyQuery.getBound().equalsIgnoreCase(TimeBoundaryQuery.MIN_TIME)
? null
: adapter.getMaxTime();
}
return legacyQuery.buildResult(
adapter.getInterval().getStart(),

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Druids;
import io.druid.query.CacheStrategy;
import io.druid.query.Result;
import io.druid.query.TableDataSource;
@ -45,6 +46,7 @@ public class TimeBoundaryQueryQueryToolChestTest
new TableDataSource("test"),
null,
null,
null,
null
);
@ -52,6 +54,7 @@ public class TimeBoundaryQueryQueryToolChestTest
new TableDataSource("test"),
null,
TimeBoundaryQuery.MAX_TIME,
null,
null
);
@ -59,9 +62,14 @@ public class TimeBoundaryQueryQueryToolChestTest
new TableDataSource("test"),
null,
TimeBoundaryQuery.MIN_TIME,
null,
null
);
private static final TimeBoundaryQuery FILTERED_BOUNDARY_QUERY = Druids.newTimeBoundaryQueryBuilder()
.dataSource("testing")
.filters("foo", "bar")
.build();
private static LogicalSegment createLogicalSegment(final Interval interval)
{
@ -165,6 +173,24 @@ public class TimeBoundaryQueryQueryToolChestTest
}
}
@Test
public void testFilteredFilterSegments() throws Exception
{
List<LogicalSegment> segments = new TimeBoundaryQueryQueryToolChest().filterSegments(
FILTERED_BOUNDARY_QUERY,
Arrays.asList(
createLogicalSegment(new Interval("2013-01-01/P1D")),
createLogicalSegment(new Interval("2013-01-01T01/PT1H")),
createLogicalSegment(new Interval("2013-01-01T02/PT1H")),
createLogicalSegment(new Interval("2013-01-02/P1D")),
createLogicalSegment(new Interval("2013-01-03T01/PT1H")),
createLogicalSegment(new Interval("2013-01-03T02/PT1H")),
createLogicalSegment(new Interval("2013-01-03/P1D"))
)
);
Assert.assertEquals(7, segments.size());
}
@Test
public void testCacheStrategy() throws Exception
{
@ -180,6 +206,7 @@ public class TimeBoundaryQueryQueryToolChestTest
)
),
null,
null,
null
)
);

View File

@ -22,13 +22,31 @@ package io.druid.query.timeboundary;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Iterables;
import com.google.common.io.CharSource;
import com.metamx.common.guava.Sequences;
import io.druid.granularity.QueryGranularities;
import io.druid.query.Druids;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
import io.druid.query.TableDataSource;
import io.druid.query.ordering.StringComparators;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.Segment;
import io.druid.segment.TestIndex;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.SingleElementPartitionChunk;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.apache.commons.lang.StringUtils;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -56,6 +74,12 @@ public class TimeBoundaryQueryRunnerTest
}
private final QueryRunner runner;
private static final QueryRunnerFactory factory = new TimeBoundaryQueryRunnerFactory(
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
private static Segment segment0;
private static Segment segment1;
private static List<String> segmentIdentifiers;
public TimeBoundaryQueryRunnerTest(
QueryRunner runner
@ -64,6 +88,123 @@ public class TimeBoundaryQueryRunnerTest
this.runner = runner;
}
// Adapted from MultiSegmentSelectQueryTest, with modifications to make filtering meaningful
public static final String[] V_0112 = {
"2011-01-12T01:00:00.000Z spot business preferred bpreferred 100.000000",
"2011-01-12T02:00:00.000Z spot entertainment preferred epreferred 100.000000",
"2011-01-13T00:00:00.000Z spot automotive preferred apreferred 100.000000",
"2011-01-13T01:00:00.000Z spot business preferred bpreferred 100.000000",
};
public static final String[] V_0113 = {
"2011-01-14T00:00:00.000Z spot automotive preferred apreferred 94.874713",
"2011-01-14T02:00:00.000Z spot entertainment preferred epreferred 110.087299",
"2011-01-15T00:00:00.000Z spot automotive preferred apreferred 94.874713",
"2011-01-15T01:00:00.000Z spot business preferred bpreferred 103.629399",
"2011-01-16T00:00:00.000Z spot automotive preferred apreferred 94.874713",
"2011-01-16T01:00:00.000Z spot business preferred bpreferred 103.629399",
"2011-01-16T02:00:00.000Z spot entertainment preferred epreferred 110.087299",
"2011-01-17T01:00:00.000Z spot business preferred bpreferred 103.629399",
"2011-01-17T02:00:00.000Z spot entertainment preferred epreferred 110.087299",
};
private static IncrementalIndex newIndex(String minTimeStamp)
{
return newIndex(minTimeStamp, 10000);
}
private static IncrementalIndex newIndex(String minTimeStamp, int maxRowCount)
{
final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(new DateTime(minTimeStamp).getMillis())
.withQueryGranularity(QueryGranularities.HOUR)
.withMetrics(TestIndex.METRIC_AGGS)
.build();
return new OnheapIncrementalIndex(schema, true, maxRowCount);
}
private static String makeIdentifier(IncrementalIndex index, String version)
{
return makeIdentifier(index.getInterval(), version);
}
private static String makeIdentifier(Interval interval, String version)
{
return DataSegment.makeDataSegmentIdentifier(
QueryRunnerTestHelper.dataSource,
interval.getStart(),
interval.getEnd(),
version,
new NoneShardSpec()
);
}
private QueryRunner getCustomRunner() throws IOException {
CharSource v_0112 = CharSource.wrap(StringUtils.join(V_0112, "\n"));
CharSource v_0113 = CharSource.wrap(StringUtils.join(V_0113, "\n"));
IncrementalIndex index0 = TestIndex.loadIncrementalIndex(newIndex("2011-01-12T00:00:00.000Z"), v_0112);
IncrementalIndex index1 = TestIndex.loadIncrementalIndex(newIndex("2011-01-14T00:00:00.000Z"), v_0113);
segment0 = new IncrementalIndexSegment(index0, makeIdentifier(index0, "v1"));
segment1 = new IncrementalIndexSegment(index1, makeIdentifier(index1, "v1"));
VersionedIntervalTimeline<String, Segment> timeline = new VersionedIntervalTimeline(StringComparators.LEXICOGRAPHIC);
timeline.add(index0.getInterval(), "v1", new SingleElementPartitionChunk(segment0));
timeline.add(index1.getInterval(), "v1", new SingleElementPartitionChunk(segment1));
segmentIdentifiers = Lists.newArrayList();
for (TimelineObjectHolder<String, ?> holder : timeline.lookup(new Interval("2011-01-12/2011-01-17"))) {
segmentIdentifiers.add(makeIdentifier(holder.getInterval(), holder.getVersion()));
}
return QueryRunnerTestHelper.makeFilteringQueryRunner(timeline, factory);
}
@Test
@SuppressWarnings("unchecked")
public void testFilteredTimeBoundaryQuery() throws IOException
{
QueryRunner customRunner = getCustomRunner();
TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
.dataSource("testing")
.filters("quality", "automotive")
.build();
Assert.assertTrue(timeBoundaryQuery.hasFilters());
HashMap<String,Object> context = new HashMap<String, Object>();
Iterable<Result<TimeBoundaryResultValue>> results = Sequences.toList(
customRunner.run(timeBoundaryQuery, context),
Lists.<Result<TimeBoundaryResultValue>>newArrayList()
);
Assert.assertTrue(Iterables.size(results) > 0);
TimeBoundaryResultValue val = results.iterator().next().getValue();
DateTime minTime = val.getMinTime();
DateTime maxTime = val.getMaxTime();
Assert.assertEquals(new DateTime("2011-01-13T00:00:00.000Z"), minTime);
Assert.assertEquals(new DateTime("2011-01-16T00:00:00.000Z"), maxTime);
}
@Test
@SuppressWarnings("unchecked")
public void testFilteredTimeBoundaryQueryNoMatches() throws IOException
{
QueryRunner customRunner = getCustomRunner();
TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
.dataSource("testing")
.filters("quality", "foobar") // foobar dimension does not exist
.build();
Assert.assertTrue(timeBoundaryQuery.hasFilters());
HashMap<String,Object> context = new HashMap<String, Object>();
Iterable<Result<TimeBoundaryResultValue>> results = Sequences.toList(
customRunner.run(timeBoundaryQuery, context),
Lists.<Result<TimeBoundaryResultValue>>newArrayList()
);
Assert.assertTrue(Iterables.size(results) == 0);
}
@Test
@SuppressWarnings("unchecked")
public void testTimeBoundary()
@ -71,6 +212,7 @@ public class TimeBoundaryQueryRunnerTest
TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
.dataSource("testing")
.build();
Assert.assertFalse(timeBoundaryQuery.hasFilters());
HashMap<String,Object> context = new HashMap<String, Object>();
Iterable<Result<TimeBoundaryResultValue>> results = Sequences.toList(
runner.run(timeBoundaryQuery, context),
@ -152,7 +294,7 @@ public class TimeBoundaryQueryRunnerTest
)
);
TimeBoundaryQuery query = new TimeBoundaryQuery(new TableDataSource("test"), null, null, null);
TimeBoundaryQuery query = new TimeBoundaryQuery(new TableDataSource("test"), null, null, null, null);
Iterable<Result<TimeBoundaryResultValue>> actual = query.mergeResults(results);
Assert.assertTrue(actual.iterator().next().getValue().getMaxTime().equals(new DateTime("2012-02-01")));
@ -163,7 +305,7 @@ public class TimeBoundaryQueryRunnerTest
{
List<Result<TimeBoundaryResultValue>> results = Lists.newArrayList();
TimeBoundaryQuery query = new TimeBoundaryQuery(new TableDataSource("test"), null, null, null);
TimeBoundaryQuery query = new TimeBoundaryQuery(new TableDataSource("test"), null, null, null, null);
Iterable<Result<TimeBoundaryResultValue>> actual = query.mergeResults(results);
Assert.assertFalse(actual.iterator().hasNext());

View File

@ -127,6 +127,7 @@ public class QueryHostFinderTest
new TableDataSource("test"),
new MultipleIntervalSegmentSpec(Arrays.<Interval>asList(new Interval("2011-08-31/2011-09-01"))),
null,
null,
null
)
);