diff --git a/processing/src/main/java/io/druid/query/TimewarpOperator.java b/processing/src/main/java/io/druid/query/TimewarpOperator.java index ac2a5f8c930..49a8fb1ed85 100644 --- a/processing/src/main/java/io/druid/query/TimewarpOperator.java +++ b/processing/src/main/java/io/druid/query/TimewarpOperator.java @@ -22,7 +22,6 @@ package io.druid.query; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Function; -import com.google.common.collect.ImmutableMap; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import io.druid.data.input.MapBasedRow; @@ -80,7 +79,7 @@ public class TimewarpOperator implements PostProcessingOperator return new QueryRunner() { @Override - public Sequence run(Query query) + public Sequence run(final Query query) { final long offset = computeOffset(now); @@ -103,12 +102,19 @@ public class TimewarpOperator implements PostProcessingOperator Object value = res.getValue(); if (value instanceof TimeBoundaryResultValue) { TimeBoundaryResultValue boundary = (TimeBoundaryResultValue) value; - value = new TimeBoundaryResultValue( - ImmutableMap.of( - TimeBoundaryQuery.MIN_TIME, boundary.getMinTime().minus(offset), - TimeBoundaryQuery.MAX_TIME, new DateTime(Math.min(boundary.getMaxTime().getMillis() - offset, now)) - ) - ); + + DateTime minTime = null; + try{ + minTime = boundary.getMinTime(); + } catch(IllegalArgumentException e) {} + + final DateTime maxTime = boundary.getMaxTime(); + + return (T) ((TimeBoundaryQuery) query).buildResult( + new DateTime(Math.min(res.getTimestamp().getMillis() - offset, now)), + minTime != null ? minTime.minus(offset) : null, + maxTime != null ? new DateTime(Math.min(maxTime.getMillis() - offset, now)) : null + ).iterator().next(); } return (T) new Result(res.getTimestamp().minus(offset), value); } else if (input instanceof MapBasedRow) { diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index fdde44b8581..088ec0b6eb8 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -145,9 +145,10 @@ public class TimeBoundaryQueryQueryToolChest @Override public byte[] computeCacheKey(TimeBoundaryQuery query) { - return ByteBuffer.allocate(2) + final byte[] cacheKey = query.getCacheKey(); + return ByteBuffer.allocate(1 + cacheKey.length) .put(TIMEBOUNDARY_QUERY) - .put(query.getCacheKey()) + .put(cacheKey) .array(); } diff --git a/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java b/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java index 4dfb8bbe26c..f1703e8099c 100644 --- a/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java +++ b/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java @@ -26,6 +26,7 @@ import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.query.timeboundary.TimeBoundaryResultValue; import io.druid.query.timeseries.TimeseriesResultValue; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -122,5 +123,46 @@ public class TimewarpOperatorTest Sequences.toList(queryRunner.run(query), Lists.>newArrayList()) ); + + TimewarpOperator> timeBoundaryOperator = new TimewarpOperator<>( + new Interval(new DateTime("2014-01-01"), new DateTime("2014-01-15")), + new Period("P1W"), + new DateTime("2014-01-06") // align on Monday + ); + + QueryRunner> timeBoundaryRunner = timeBoundaryOperator.postProcess( + new QueryRunner>() + { + @Override + public Sequence> run(Query> query) + { + return Sequences.simple( + ImmutableList.of( + new Result<>( + new DateTime("2014-01-12"), + new TimeBoundaryResultValue(ImmutableMap.of("maxTime", new DateTime("2014-01-12"))) + ) + ) + ); + } + }, + new DateTime("2014-08-02").getMillis() + ); + + final Query> timeBoundaryQuery = + Druids.newTimeBoundaryQueryBuilder() + .dataSource("dummy") + .build(); + + Assert.assertEquals( + Lists.newArrayList( + new Result<>( + new DateTime("2014-08-02"), + new TimeBoundaryResultValue(ImmutableMap.of("maxTime", new DateTime("2014-08-02"))) + ) + ), + Sequences.toList(timeBoundaryRunner.run(timeBoundaryQuery), Lists.>newArrayList()) + ); + } } diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index 8437261afef..f99f3d4346b 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -929,6 +929,48 @@ public class CachingClusteredClientTest new Interval("2011-01-01/2011-01-10"), makeTimeBoundaryResult(new DateTime("2011-01-05T01"), new DateTime("2011-01-05T01"), new DateTime("2011-01-10")) ); + + testQueryCaching( + client, + Druids.newTimeBoundaryQueryBuilder() + .dataSource(CachingClusteredClientTest.DATA_SOURCE) + .intervals(CachingClusteredClientTest.SEG_SPEC) + .context(CachingClusteredClientTest.CONTEXT) + .bound(TimeBoundaryQuery.MAX_TIME) + .build(), + new Interval("2011-01-01/2011-01-02"), + makeTimeBoundaryResult(new DateTime("2011-01-01"), null, new DateTime("2011-01-02")), + + new Interval("2011-01-01/2011-01-03"), + makeTimeBoundaryResult(new DateTime("2011-01-02"), null, new DateTime("2011-01-03")), + + new Interval("2011-01-01/2011-01-10"), + makeTimeBoundaryResult(new DateTime("2011-01-05"), null, new DateTime("2011-01-10")), + + new Interval("2011-01-01/2011-01-10"), + makeTimeBoundaryResult(new DateTime("2011-01-05T01"), null, new DateTime("2011-01-10")) + ); + + testQueryCaching( + client, + Druids.newTimeBoundaryQueryBuilder() + .dataSource(CachingClusteredClientTest.DATA_SOURCE) + .intervals(CachingClusteredClientTest.SEG_SPEC) + .context(CachingClusteredClientTest.CONTEXT) + .bound(TimeBoundaryQuery.MIN_TIME) + .build(), + new Interval("2011-01-01/2011-01-02"), + makeTimeBoundaryResult(new DateTime("2011-01-01"), new DateTime("2011-01-01"), null), + + new Interval("2011-01-01/2011-01-03"), + makeTimeBoundaryResult(new DateTime("2011-01-02"), new DateTime("2011-01-02"), null), + + new Interval("2011-01-01/2011-01-10"), + makeTimeBoundaryResult(new DateTime("2011-01-05"), new DateTime("2011-01-05"), null), + + new Interval("2011-01-01/2011-01-10"), + makeTimeBoundaryResult(new DateTime("2011-01-05T01"), new DateTime("2011-01-05T01"), null) + ); } private Iterable> makeTimeBoundaryResult( @@ -937,17 +979,30 @@ public class CachingClusteredClientTest DateTime maxTime ) { + final Object value; + if (minTime != null && maxTime != null) { + value = ImmutableMap.of( + TimeBoundaryQuery.MIN_TIME, + minTime.toString(), + TimeBoundaryQuery.MAX_TIME, + maxTime.toString() + ); + } else if (maxTime != null) { + value = ImmutableMap.of( + TimeBoundaryQuery.MAX_TIME, + maxTime.toString() + ); + } else { + value = ImmutableMap.of( + TimeBoundaryQuery.MIN_TIME, + minTime.toString() + ); + } + return Arrays.asList( new Result<>( timestamp, - new TimeBoundaryResultValue( - ImmutableMap.of( - TimeBoundaryQuery.MIN_TIME, - minTime.toString(), - TimeBoundaryQuery.MAX_TIME, - maxTime.toString() - ) - ) + new TimeBoundaryResultValue(value) ) ); }