diff --git a/processing/src/main/java/io/druid/query/Druids.java b/processing/src/main/java/io/druid/query/Druids.java index 084652b322f..932af432ad1 100644 --- a/processing/src/main/java/io/druid/query/Druids.java +++ b/processing/src/main/java/io/druid/query/Druids.java @@ -692,12 +692,14 @@ public class Druids { private DataSource dataSource; private QuerySegmentSpec querySegmentSpec; + private String bound; private Map context; public TimeBoundaryQueryBuilder() { dataSource = null; querySegmentSpec = null; + bound = null; context = null; } @@ -706,6 +708,7 @@ public class Druids return new TimeBoundaryQuery( dataSource, querySegmentSpec, + bound, context ); } @@ -715,6 +718,7 @@ public class Druids return new TimeBoundaryQueryBuilder() .dataSource(builder.dataSource) .intervals(builder.querySegmentSpec) + .bound(builder.bound) .context(builder.context); } @@ -748,6 +752,12 @@ public class Druids return this; } + public TimeBoundaryQueryBuilder bound(String b) + { + bound = b; + return this; + } + public TimeBoundaryQueryBuilder context(Map c) { context = c; diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java index 6f3e70b9851..574d0704602 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java @@ -21,6 +21,7 @@ package io.druid.query.timeboundary; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Charsets; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -48,12 +49,34 @@ public class TimeBoundaryQuery extends BaseQuery ); public static final String MAX_TIME = "maxTime"; public static final String MIN_TIME = "minTime"; + private static final byte CACHE_TYPE_ID = 0x0; + public static Iterable> buildResult(DateTime timestamp, DateTime min, DateTime max) + { + List> results = Lists.newArrayList(); + Map result = Maps.newHashMap(); + + if (min != null) { + result.put(MIN_TIME, min); + } + if (max != null) { + result.put(MAX_TIME, max); + } + if (!result.isEmpty()) { + results.add(new Result<>(timestamp, new TimeBoundaryResultValue(result))); + } + + return results; + } + + private final String bound; + @JsonCreator public TimeBoundaryQuery( @JsonProperty("dataSource") DataSource dataSource, @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec, + @JsonProperty("bound") String bound, @JsonProperty("context") Map context ) { @@ -63,6 +86,8 @@ public class TimeBoundaryQuery extends BaseQuery : querySegmentSpec, context ); + + this.bound = bound == null ? "" : bound; } @Override @@ -77,12 +102,19 @@ public class TimeBoundaryQuery extends BaseQuery return Query.TIME_BOUNDARY; } + @JsonProperty + public String getBound() + { + return bound; + } + @Override public TimeBoundaryQuery withOverriddenContext(Map contextOverrides) { return new TimeBoundaryQuery( getDataSource(), getQuerySegmentSpec(), + bound, computeOverridenContext(contextOverrides) ); } @@ -93,6 +125,7 @@ public class TimeBoundaryQuery extends BaseQuery return new TimeBoundaryQuery( getDataSource(), spec, + bound, getContext() ); } @@ -103,14 +136,17 @@ public class TimeBoundaryQuery extends BaseQuery return new TimeBoundaryQuery( dataSource, getQuerySegmentSpec(), + bound, getContext() ); } public byte[] getCacheKey() { - return ByteBuffer.allocate(1) + final byte[] boundBytes = bound.getBytes(Charsets.UTF_8); + return ByteBuffer.allocate(1 + boundBytes.length) .put(CACHE_TYPE_ID) + .put(boundBytes) .array(); } @@ -121,27 +157,10 @@ public class TimeBoundaryQuery extends BaseQuery "dataSource='" + getDataSource() + '\'' + ", querySegmentSpec=" + getQuerySegmentSpec() + ", duration=" + getDuration() + + ", bound" + bound + '}'; } - public Iterable> buildResult(DateTime timestamp, DateTime min, DateTime max) - { - List> results = Lists.newArrayList(); - Map result = Maps.newHashMap(); - - if (min != null) { - result.put(TimeBoundaryQuery.MIN_TIME, min); - } - if (max != null) { - result.put(TimeBoundaryQuery.MAX_TIME, max); - } - if (!result.isEmpty()) { - results.add(new Result(timestamp, new TimeBoundaryResultValue(result))); - } - - return results; - } - public Iterable> mergeResults(List> results) { if (results == null || results.isEmpty()) { @@ -154,25 +173,33 @@ public class TimeBoundaryQuery extends BaseQuery TimeBoundaryResultValue val = result.getValue(); DateTime currMinTime = val.getMinTime(); - if (currMinTime.isBefore(min)) { + if (currMinTime != null && currMinTime.isBefore(min)) { min = currMinTime; } DateTime currMaxTime = val.getMaxTime(); - if (currMaxTime.isAfter(max)) { + if (currMaxTime != null && currMaxTime.isAfter(max)) { max = currMaxTime; } } - return Arrays.asList( - new Result( - min, - new TimeBoundaryResultValue( - ImmutableMap.of( - TimeBoundaryQuery.MIN_TIME, min, - TimeBoundaryQuery.MAX_TIME, max - ) - ) - ) - ); + final DateTime ts; + final DateTime minTime; + final DateTime maxTime; + + if (bound.equalsIgnoreCase(MIN_TIME)) { + ts = min; + minTime = min; + maxTime = null; + } else if (bound.equalsIgnoreCase(MAX_TIME)) { + ts = max; + minTime = null; + maxTime = max; + } else { + ts = min; + minTime = min; + maxTime = max; + } + + return buildResult(ts, minTime, maxTime); } } diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index cc4aaa46579..fdde44b8581 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -67,8 +67,8 @@ public class TimeBoundaryQueryQueryToolChest return segments; } - final T first = segments.get(0); - final T second = segments.get(segments.size() - 1); + final T min = segments.get(0); + final T max = segments.get(segments.size() - 1); return Lists.newArrayList( Iterables.filter( @@ -78,8 +78,8 @@ public class TimeBoundaryQueryQueryToolChest @Override public boolean apply(T input) { - return input.getInterval().overlaps(first.getInterval()) || input.getInterval() - .overlaps(second.getInterval()); + return (min != null && input.getInterval().overlaps(min.getInterval())) || + (max != null && input.getInterval().overlaps(max.getInterval())); } } ) @@ -111,7 +111,7 @@ public class TimeBoundaryQueryQueryToolChest @Override public Sequence> mergeSequences(Sequence>> seqOfSequences) { - return new OrderedMergeSequence>(getOrdering(), seqOfSequences); + return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); } @Override @@ -146,9 +146,9 @@ public class TimeBoundaryQueryQueryToolChest public byte[] computeCacheKey(TimeBoundaryQuery query) { return ByteBuffer.allocate(2) - .put(TIMEBOUNDARY_QUERY) - .put(query.getCacheKey()) - .array(); + .put(TIMEBOUNDARY_QUERY) + .put(query.getCacheKey()) + .array(); } @Override @@ -177,11 +177,11 @@ public class TimeBoundaryQueryQueryToolChest { @Override @SuppressWarnings("unchecked") - public Result apply(@Nullable Object input) + public Result apply(Object input) { List result = (List) input; - return new Result( + return new Result<>( new DateTime(result.get(0)), new TimeBoundaryResultValue(result.get(1)) ); @@ -192,7 +192,7 @@ public class TimeBoundaryQueryQueryToolChest @Override public Sequence> mergeSequences(Sequence>> seqOfSequences) { - return new MergeSequence>(getOrdering(), seqOfSequences); + return new MergeSequence<>(getOrdering(), seqOfSequences); } }; } diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java index 1f78429ead3..bdb17694346 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java @@ -32,6 +32,7 @@ import io.druid.query.QueryWatcher; import io.druid.query.Result; import io.druid.segment.Segment; import io.druid.segment.StorageAdapter; +import org.joda.time.DateTime; import java.util.Iterator; import java.util.concurrent.ExecutorService; @@ -61,7 +62,7 @@ public class TimeBoundaryQueryRunnerFactory ExecutorService queryExecutor, Iterable>> queryRunners ) { - return new ChainedExecutionQueryRunner>( + return new ChainedExecutionQueryRunner<>( queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners ); } @@ -90,7 +91,7 @@ public class TimeBoundaryQueryRunnerFactory final TimeBoundaryQuery legacyQuery = (TimeBoundaryQuery) input; - return new BaseSequence, Iterator>>( + return new BaseSequence<>( new BaseSequence.IteratorMaker, Iterator>>() { @Override @@ -102,10 +103,18 @@ public class TimeBoundaryQueryRunnerFactory ); } + final DateTime minTime = legacyQuery.getBound().equalsIgnoreCase(TimeBoundaryQuery.MAX_TIME) + ? null + : adapter.getMinTime(); + final DateTime maxTime = legacyQuery.getBound().equalsIgnoreCase(TimeBoundaryQuery.MIN_TIME) + ? null + : adapter.getMaxTime(); + + return legacyQuery.buildResult( adapter.getInterval().getStart(), - adapter.getMinTime(), - adapter.getMaxTime() + minTime, + maxTime ).iterator(); } diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryResultValue.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryResultValue.java index 1210e95af2c..ba5777879e6 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryResultValue.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryResultValue.java @@ -99,6 +99,10 @@ public class TimeBoundaryResultValue private DateTime getDateTimeValue(Object val) { + if (val == null) { + return null; + } + if (val instanceof DateTime) { return (DateTime) val; } else if (val instanceof String) { diff --git a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java index 7bc499dca80..32ff68ab350 100644 --- a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java @@ -20,13 +20,10 @@ package io.druid.query.timeboundary; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ListenableFuture; import com.metamx.common.guava.Sequences; import io.druid.query.Druids; -import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; -import io.druid.query.QueryWatcher; import io.druid.query.Result; import org.joda.time.DateTime; import org.junit.Assert; @@ -78,4 +75,46 @@ public class TimeBoundaryQueryRunnerTest Assert.assertEquals(new DateTime("2011-01-12T00:00:00.000Z"), minTime); Assert.assertEquals(new DateTime("2011-04-15T00:00:00.000Z"), maxTime); } + + @Test + @SuppressWarnings("unchecked") + public void testTimeBoundaryMax() + { + TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder() + .dataSource("testing") + .bound(TimeBoundaryQuery.MAX_TIME) + .build(); + + Iterable> results = Sequences.toList( + runner.run(timeBoundaryQuery), + Lists.>newArrayList() + ); + TimeBoundaryResultValue val = results.iterator().next().getValue(); + DateTime minTime = val.getMinTime(); + DateTime maxTime = val.getMaxTime(); + + Assert.assertNull(minTime); + Assert.assertEquals(new DateTime("2011-04-15T00:00:00.000Z"), maxTime); + } + + @Test + @SuppressWarnings("unchecked") + public void testTimeBoundaryMin() + { + TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder() + .dataSource("testing") + .bound(TimeBoundaryQuery.MIN_TIME) + .build(); + + Iterable> results = Sequences.toList( + runner.run(timeBoundaryQuery), + Lists.>newArrayList() + ); + TimeBoundaryResultValue val = results.iterator().next().getValue(); + DateTime minTime = val.getMinTime(); + DateTime maxTime = val.getMaxTime(); + + Assert.assertEquals(new DateTime("2011-01-12T00:00:00.000Z"), minTime); + Assert.assertNull(maxTime); + } }