Merge pull request #603 from metamx/max-min-time

enable tb to return just max or min time
This commit is contained in:
xvrl 2014-06-16 14:31:35 -07:00
commit 1b99553394
6 changed files with 139 additions and 50 deletions

View File

@ -692,12 +692,14 @@ public class Druids
{ {
private DataSource dataSource; private DataSource dataSource;
private QuerySegmentSpec querySegmentSpec; private QuerySegmentSpec querySegmentSpec;
private String bound;
private Map<String, Object> context; private Map<String, Object> context;
public TimeBoundaryQueryBuilder() public TimeBoundaryQueryBuilder()
{ {
dataSource = null; dataSource = null;
querySegmentSpec = null; querySegmentSpec = null;
bound = null;
context = null; context = null;
} }
@ -706,6 +708,7 @@ public class Druids
return new TimeBoundaryQuery( return new TimeBoundaryQuery(
dataSource, dataSource,
querySegmentSpec, querySegmentSpec,
bound,
context context
); );
} }
@ -715,6 +718,7 @@ public class Druids
return new TimeBoundaryQueryBuilder() return new TimeBoundaryQueryBuilder()
.dataSource(builder.dataSource) .dataSource(builder.dataSource)
.intervals(builder.querySegmentSpec) .intervals(builder.querySegmentSpec)
.bound(builder.bound)
.context(builder.context); .context(builder.context);
} }
@ -748,6 +752,12 @@ public class Druids
return this; return this;
} }
public TimeBoundaryQueryBuilder bound(String b)
{
bound = b;
return this;
}
public TimeBoundaryQueryBuilder context(Map<String, Object> c) public TimeBoundaryQueryBuilder context(Map<String, Object> c)
{ {
context = c; context = c;

View File

@ -21,6 +21,7 @@ package io.druid.query.timeboundary;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
@ -48,12 +49,34 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
); );
public static final String MAX_TIME = "maxTime"; public static final String MAX_TIME = "maxTime";
public static final String MIN_TIME = "minTime"; public static final String MIN_TIME = "minTime";
private static final byte CACHE_TYPE_ID = 0x0; private static final byte CACHE_TYPE_ID = 0x0;
public static Iterable<Result<TimeBoundaryResultValue>> buildResult(DateTime timestamp, DateTime min, DateTime max)
{
List<Result<TimeBoundaryResultValue>> results = Lists.newArrayList();
Map<String, Object> result = Maps.newHashMap();
if (min != null) {
result.put(MIN_TIME, min);
}
if (max != null) {
result.put(MAX_TIME, max);
}
if (!result.isEmpty()) {
results.add(new Result<>(timestamp, new TimeBoundaryResultValue(result)));
}
return results;
}
private final String bound;
@JsonCreator @JsonCreator
public TimeBoundaryQuery( public TimeBoundaryQuery(
@JsonProperty("dataSource") DataSource dataSource, @JsonProperty("dataSource") DataSource dataSource,
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec, @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
@JsonProperty("bound") String bound,
@JsonProperty("context") Map<String, Object> context @JsonProperty("context") Map<String, Object> context
) )
{ {
@ -63,6 +86,8 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
: querySegmentSpec, : querySegmentSpec,
context context
); );
this.bound = bound == null ? "" : bound;
} }
@Override @Override
@ -77,12 +102,19 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
return Query.TIME_BOUNDARY; return Query.TIME_BOUNDARY;
} }
@JsonProperty
public String getBound()
{
return bound;
}
@Override @Override
public TimeBoundaryQuery withOverriddenContext(Map<String, Object> contextOverrides) public TimeBoundaryQuery withOverriddenContext(Map<String, Object> contextOverrides)
{ {
return new TimeBoundaryQuery( return new TimeBoundaryQuery(
getDataSource(), getDataSource(),
getQuerySegmentSpec(), getQuerySegmentSpec(),
bound,
computeOverridenContext(contextOverrides) computeOverridenContext(contextOverrides)
); );
} }
@ -93,6 +125,7 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
return new TimeBoundaryQuery( return new TimeBoundaryQuery(
getDataSource(), getDataSource(),
spec, spec,
bound,
getContext() getContext()
); );
} }
@ -103,14 +136,17 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
return new TimeBoundaryQuery( return new TimeBoundaryQuery(
dataSource, dataSource,
getQuerySegmentSpec(), getQuerySegmentSpec(),
bound,
getContext() getContext()
); );
} }
public byte[] getCacheKey() public byte[] getCacheKey()
{ {
return ByteBuffer.allocate(1) final byte[] boundBytes = bound.getBytes(Charsets.UTF_8);
return ByteBuffer.allocate(1 + boundBytes.length)
.put(CACHE_TYPE_ID) .put(CACHE_TYPE_ID)
.put(boundBytes)
.array(); .array();
} }
@ -121,27 +157,10 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
"dataSource='" + getDataSource() + '\'' + "dataSource='" + getDataSource() + '\'' +
", querySegmentSpec=" + getQuerySegmentSpec() + ", querySegmentSpec=" + getQuerySegmentSpec() +
", duration=" + getDuration() + ", duration=" + getDuration() +
", bound" + bound +
'}'; '}';
} }
public Iterable<Result<TimeBoundaryResultValue>> buildResult(DateTime timestamp, DateTime min, DateTime max)
{
List<Result<TimeBoundaryResultValue>> results = Lists.newArrayList();
Map<String, Object> result = Maps.newHashMap();
if (min != null) {
result.put(TimeBoundaryQuery.MIN_TIME, min);
}
if (max != null) {
result.put(TimeBoundaryQuery.MAX_TIME, max);
}
if (!result.isEmpty()) {
results.add(new Result<TimeBoundaryResultValue>(timestamp, new TimeBoundaryResultValue(result)));
}
return results;
}
public Iterable<Result<TimeBoundaryResultValue>> mergeResults(List<Result<TimeBoundaryResultValue>> results) public Iterable<Result<TimeBoundaryResultValue>> mergeResults(List<Result<TimeBoundaryResultValue>> results)
{ {
if (results == null || results.isEmpty()) { if (results == null || results.isEmpty()) {
@ -154,25 +173,33 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
TimeBoundaryResultValue val = result.getValue(); TimeBoundaryResultValue val = result.getValue();
DateTime currMinTime = val.getMinTime(); DateTime currMinTime = val.getMinTime();
if (currMinTime.isBefore(min)) { if (currMinTime != null && currMinTime.isBefore(min)) {
min = currMinTime; min = currMinTime;
} }
DateTime currMaxTime = val.getMaxTime(); DateTime currMaxTime = val.getMaxTime();
if (currMaxTime.isAfter(max)) { if (currMaxTime != null && currMaxTime.isAfter(max)) {
max = currMaxTime; max = currMaxTime;
} }
} }
return Arrays.asList( final DateTime ts;
new Result<TimeBoundaryResultValue>( final DateTime minTime;
min, final DateTime maxTime;
new TimeBoundaryResultValue(
ImmutableMap.<String, Object>of( if (bound.equalsIgnoreCase(MIN_TIME)) {
TimeBoundaryQuery.MIN_TIME, min, ts = min;
TimeBoundaryQuery.MAX_TIME, max minTime = min;
) maxTime = null;
) } else if (bound.equalsIgnoreCase(MAX_TIME)) {
) ts = max;
); minTime = null;
maxTime = max;
} else {
ts = min;
minTime = min;
maxTime = max;
}
return buildResult(ts, minTime, maxTime);
} }
} }

View File

@ -67,8 +67,8 @@ public class TimeBoundaryQueryQueryToolChest
return segments; return segments;
} }
final T first = segments.get(0); final T min = segments.get(0);
final T second = segments.get(segments.size() - 1); final T max = segments.get(segments.size() - 1);
return Lists.newArrayList( return Lists.newArrayList(
Iterables.filter( Iterables.filter(
@ -78,8 +78,8 @@ public class TimeBoundaryQueryQueryToolChest
@Override @Override
public boolean apply(T input) public boolean apply(T input)
{ {
return input.getInterval().overlaps(first.getInterval()) || input.getInterval() return (min != null && input.getInterval().overlaps(min.getInterval())) ||
.overlaps(second.getInterval()); (max != null && input.getInterval().overlaps(max.getInterval()));
} }
} }
) )
@ -111,7 +111,7 @@ public class TimeBoundaryQueryQueryToolChest
@Override @Override
public Sequence<Result<TimeBoundaryResultValue>> mergeSequences(Sequence<Sequence<Result<TimeBoundaryResultValue>>> seqOfSequences) public Sequence<Result<TimeBoundaryResultValue>> mergeSequences(Sequence<Sequence<Result<TimeBoundaryResultValue>>> seqOfSequences)
{ {
return new OrderedMergeSequence<Result<TimeBoundaryResultValue>>(getOrdering(), seqOfSequences); return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
} }
@Override @Override
@ -177,11 +177,11 @@ public class TimeBoundaryQueryQueryToolChest
{ {
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public Result<TimeBoundaryResultValue> apply(@Nullable Object input) public Result<TimeBoundaryResultValue> apply(Object input)
{ {
List<Object> result = (List<Object>) input; List<Object> result = (List<Object>) input;
return new Result<TimeBoundaryResultValue>( return new Result<>(
new DateTime(result.get(0)), new DateTime(result.get(0)),
new TimeBoundaryResultValue(result.get(1)) new TimeBoundaryResultValue(result.get(1))
); );
@ -192,7 +192,7 @@ public class TimeBoundaryQueryQueryToolChest
@Override @Override
public Sequence<Result<TimeBoundaryResultValue>> mergeSequences(Sequence<Sequence<Result<TimeBoundaryResultValue>>> seqOfSequences) public Sequence<Result<TimeBoundaryResultValue>> mergeSequences(Sequence<Sequence<Result<TimeBoundaryResultValue>>> seqOfSequences)
{ {
return new MergeSequence<Result<TimeBoundaryResultValue>>(getOrdering(), seqOfSequences); return new MergeSequence<>(getOrdering(), seqOfSequences);
} }
}; };
} }

View File

@ -32,6 +32,7 @@ import io.druid.query.QueryWatcher;
import io.druid.query.Result; import io.druid.query.Result;
import io.druid.segment.Segment; import io.druid.segment.Segment;
import io.druid.segment.StorageAdapter; import io.druid.segment.StorageAdapter;
import org.joda.time.DateTime;
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -61,7 +62,7 @@ public class TimeBoundaryQueryRunnerFactory
ExecutorService queryExecutor, Iterable<QueryRunner<Result<TimeBoundaryResultValue>>> queryRunners ExecutorService queryExecutor, Iterable<QueryRunner<Result<TimeBoundaryResultValue>>> queryRunners
) )
{ {
return new ChainedExecutionQueryRunner<Result<TimeBoundaryResultValue>>( return new ChainedExecutionQueryRunner<>(
queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners
); );
} }
@ -90,7 +91,7 @@ public class TimeBoundaryQueryRunnerFactory
final TimeBoundaryQuery legacyQuery = (TimeBoundaryQuery) input; final TimeBoundaryQuery legacyQuery = (TimeBoundaryQuery) input;
return new BaseSequence<Result<TimeBoundaryResultValue>, Iterator<Result<TimeBoundaryResultValue>>>( return new BaseSequence<>(
new BaseSequence.IteratorMaker<Result<TimeBoundaryResultValue>, Iterator<Result<TimeBoundaryResultValue>>>() new BaseSequence.IteratorMaker<Result<TimeBoundaryResultValue>, Iterator<Result<TimeBoundaryResultValue>>>()
{ {
@Override @Override
@ -102,10 +103,18 @@ public class TimeBoundaryQueryRunnerFactory
); );
} }
final DateTime minTime = legacyQuery.getBound().equalsIgnoreCase(TimeBoundaryQuery.MAX_TIME)
? null
: adapter.getMinTime();
final DateTime maxTime = legacyQuery.getBound().equalsIgnoreCase(TimeBoundaryQuery.MIN_TIME)
? null
: adapter.getMaxTime();
return legacyQuery.buildResult( return legacyQuery.buildResult(
adapter.getInterval().getStart(), adapter.getInterval().getStart(),
adapter.getMinTime(), minTime,
adapter.getMaxTime() maxTime
).iterator(); ).iterator();
} }

View File

@ -99,6 +99,10 @@ public class TimeBoundaryResultValue
private DateTime getDateTimeValue(Object val) private DateTime getDateTimeValue(Object val)
{ {
if (val == null) {
return null;
}
if (val instanceof DateTime) { if (val instanceof DateTime) {
return (DateTime) val; return (DateTime) val;
} else if (val instanceof String) { } else if (val instanceof String) {

View File

@ -20,13 +20,10 @@
package io.druid.query.timeboundary; package io.druid.query.timeboundary;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import io.druid.query.Druids; import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper; import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryWatcher;
import io.druid.query.Result; import io.druid.query.Result;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.junit.Assert; import org.junit.Assert;
@ -78,4 +75,46 @@ public class TimeBoundaryQueryRunnerTest
Assert.assertEquals(new DateTime("2011-01-12T00:00:00.000Z"), minTime); Assert.assertEquals(new DateTime("2011-01-12T00:00:00.000Z"), minTime);
Assert.assertEquals(new DateTime("2011-04-15T00:00:00.000Z"), maxTime); Assert.assertEquals(new DateTime("2011-04-15T00:00:00.000Z"), maxTime);
} }
@Test
@SuppressWarnings("unchecked")
public void testTimeBoundaryMax()
{
TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
.dataSource("testing")
.bound(TimeBoundaryQuery.MAX_TIME)
.build();
Iterable<Result<TimeBoundaryResultValue>> results = Sequences.toList(
runner.run(timeBoundaryQuery),
Lists.<Result<TimeBoundaryResultValue>>newArrayList()
);
TimeBoundaryResultValue val = results.iterator().next().getValue();
DateTime minTime = val.getMinTime();
DateTime maxTime = val.getMaxTime();
Assert.assertNull(minTime);
Assert.assertEquals(new DateTime("2011-04-15T00:00:00.000Z"), maxTime);
}
@Test
@SuppressWarnings("unchecked")
public void testTimeBoundaryMin()
{
TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
.dataSource("testing")
.bound(TimeBoundaryQuery.MIN_TIME)
.build();
Iterable<Result<TimeBoundaryResultValue>> results = Sequences.toList(
runner.run(timeBoundaryQuery),
Lists.<Result<TimeBoundaryResultValue>>newArrayList()
);
TimeBoundaryResultValue val = results.iterator().next().getValue();
DateTime minTime = val.getMinTime();
DateTime maxTime = val.getMaxTime();
Assert.assertEquals(new DateTime("2011-01-12T00:00:00.000Z"), minTime);
Assert.assertNull(maxTime);
}
} }