From b0b39d6ec030271459e0d01b6d07c661c8cbe377 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Thu, 28 Aug 2014 17:14:25 -0700 Subject: [PATCH 1/4] fix maxTime caching for individual bounds --- .../TimeBoundaryQueryQueryToolChest.java | 5 +- .../client/CachingClusteredClientTest.java | 71 ++++++++++++++++--- 2 files changed, 66 insertions(+), 10 deletions(-) diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index fdde44b8581..088ec0b6eb8 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -145,9 +145,10 @@ public class TimeBoundaryQueryQueryToolChest @Override public byte[] computeCacheKey(TimeBoundaryQuery query) { - return ByteBuffer.allocate(2) + final byte[] cacheKey = query.getCacheKey(); + return ByteBuffer.allocate(1 + cacheKey.length) .put(TIMEBOUNDARY_QUERY) - .put(query.getCacheKey()) + .put(cacheKey) .array(); } diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index 8437261afef..f99f3d4346b 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -929,6 +929,48 @@ public class CachingClusteredClientTest new Interval("2011-01-01/2011-01-10"), makeTimeBoundaryResult(new DateTime("2011-01-05T01"), new DateTime("2011-01-05T01"), new DateTime("2011-01-10")) ); + + testQueryCaching( + client, + Druids.newTimeBoundaryQueryBuilder() + .dataSource(CachingClusteredClientTest.DATA_SOURCE) + .intervals(CachingClusteredClientTest.SEG_SPEC) + .context(CachingClusteredClientTest.CONTEXT) + .bound(TimeBoundaryQuery.MAX_TIME) + .build(), + new Interval("2011-01-01/2011-01-02"), + makeTimeBoundaryResult(new DateTime("2011-01-01"), null, new DateTime("2011-01-02")), + + new Interval("2011-01-01/2011-01-03"), + makeTimeBoundaryResult(new DateTime("2011-01-02"), null, new DateTime("2011-01-03")), + + new Interval("2011-01-01/2011-01-10"), + makeTimeBoundaryResult(new DateTime("2011-01-05"), null, new DateTime("2011-01-10")), + + new Interval("2011-01-01/2011-01-10"), + makeTimeBoundaryResult(new DateTime("2011-01-05T01"), null, new DateTime("2011-01-10")) + ); + + testQueryCaching( + client, + Druids.newTimeBoundaryQueryBuilder() + .dataSource(CachingClusteredClientTest.DATA_SOURCE) + .intervals(CachingClusteredClientTest.SEG_SPEC) + .context(CachingClusteredClientTest.CONTEXT) + .bound(TimeBoundaryQuery.MIN_TIME) + .build(), + new Interval("2011-01-01/2011-01-02"), + makeTimeBoundaryResult(new DateTime("2011-01-01"), new DateTime("2011-01-01"), null), + + new Interval("2011-01-01/2011-01-03"), + makeTimeBoundaryResult(new DateTime("2011-01-02"), new DateTime("2011-01-02"), null), + + new Interval("2011-01-01/2011-01-10"), + makeTimeBoundaryResult(new DateTime("2011-01-05"), new DateTime("2011-01-05"), null), + + new Interval("2011-01-01/2011-01-10"), + makeTimeBoundaryResult(new DateTime("2011-01-05T01"), new DateTime("2011-01-05T01"), null) + ); } private Iterable> makeTimeBoundaryResult( @@ -937,17 +979,30 @@ public class CachingClusteredClientTest DateTime maxTime ) { + final Object value; + if (minTime != null && maxTime != null) { + value = ImmutableMap.of( + TimeBoundaryQuery.MIN_TIME, + minTime.toString(), + TimeBoundaryQuery.MAX_TIME, + maxTime.toString() + ); + } else if (maxTime != null) { + value = ImmutableMap.of( + TimeBoundaryQuery.MAX_TIME, + maxTime.toString() + ); + } else { + value = ImmutableMap.of( + TimeBoundaryQuery.MIN_TIME, + minTime.toString() + ); + } + return Arrays.asList( new Result<>( timestamp, - new TimeBoundaryResultValue( - ImmutableMap.of( - TimeBoundaryQuery.MIN_TIME, - minTime.toString(), - TimeBoundaryQuery.MAX_TIME, - maxTime.toString() - ) - ) + new TimeBoundaryResultValue(value) ) ); } From 9f742a87a6afeaeaffc31058e482b7a231cef4de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Thu, 28 Aug 2014 17:15:45 -0700 Subject: [PATCH 2/4] fix timewarp with different bounds --- .../java/io/druid/query/TimewarpOperator.java | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/processing/src/main/java/io/druid/query/TimewarpOperator.java b/processing/src/main/java/io/druid/query/TimewarpOperator.java index ac2a5f8c930..4f9bc597f74 100644 --- a/processing/src/main/java/io/druid/query/TimewarpOperator.java +++ b/processing/src/main/java/io/druid/query/TimewarpOperator.java @@ -22,7 +22,6 @@ package io.druid.query; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Function; -import com.google.common.collect.ImmutableMap; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import io.druid.data.input.MapBasedRow; @@ -80,7 +79,7 @@ public class TimewarpOperator implements PostProcessingOperator return new QueryRunner() { @Override - public Sequence run(Query query) + public Sequence run(final Query query) { final long offset = computeOffset(now); @@ -101,16 +100,22 @@ public class TimewarpOperator implements PostProcessingOperator if (input instanceof Result) { Result res = (Result) input; Object value = res.getValue(); + final DateTime timestamp = res.getTimestamp().minus(offset); if (value instanceof TimeBoundaryResultValue) { TimeBoundaryResultValue boundary = (TimeBoundaryResultValue) value; - value = new TimeBoundaryResultValue( - ImmutableMap.of( - TimeBoundaryQuery.MIN_TIME, boundary.getMinTime().minus(offset), - TimeBoundaryQuery.MAX_TIME, new DateTime(Math.min(boundary.getMaxTime().getMillis() - offset, now)) - ) - ); + + DateTime minTime = null; + try{ + minTime = boundary.getMinTime(); + } catch(IllegalArgumentException e) {} + + return (T) ((TimeBoundaryQuery) query).buildResult( + timestamp, + minTime, + boundary.getMaxTime() + ).iterator().next(); } - return (T) new Result(res.getTimestamp().minus(offset), value); + return (T) new Result(timestamp, value); } else if (input instanceof MapBasedRow) { MapBasedRow row = (MapBasedRow) input; return (T) new MapBasedRow(row.getTimestamp().minus(offset), row.getEvent()); From 743a9bfdac594ddbaddb8ce67e2852ff537e8bf1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Thu, 28 Aug 2014 17:35:26 -0700 Subject: [PATCH 3/4] fix offset and nulls --- .../src/main/java/io/druid/query/TimewarpOperator.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/processing/src/main/java/io/druid/query/TimewarpOperator.java b/processing/src/main/java/io/druid/query/TimewarpOperator.java index 4f9bc597f74..b81c823759b 100644 --- a/processing/src/main/java/io/druid/query/TimewarpOperator.java +++ b/processing/src/main/java/io/druid/query/TimewarpOperator.java @@ -109,10 +109,12 @@ public class TimewarpOperator implements PostProcessingOperator minTime = boundary.getMinTime(); } catch(IllegalArgumentException e) {} + final DateTime maxTime = boundary.getMaxTime(); + return (T) ((TimeBoundaryQuery) query).buildResult( timestamp, - minTime, - boundary.getMaxTime() + minTime != null ? minTime.minus(offset) : null, + maxTime != null ? maxTime.minus(offset) : null ).iterator().next(); } return (T) new Result(timestamp, value); From 674e7b04478bee993e4ccd087a27325a21283eb4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Thu, 28 Aug 2014 17:59:56 -0700 Subject: [PATCH 4/4] add tests for timeboundary timewarp --- .../java/io/druid/query/TimewarpOperator.java | 7 ++-- .../io/druid/query/TimewarpOperatorTest.java | 42 +++++++++++++++++++ 2 files changed, 45 insertions(+), 4 deletions(-) diff --git a/processing/src/main/java/io/druid/query/TimewarpOperator.java b/processing/src/main/java/io/druid/query/TimewarpOperator.java index b81c823759b..49a8fb1ed85 100644 --- a/processing/src/main/java/io/druid/query/TimewarpOperator.java +++ b/processing/src/main/java/io/druid/query/TimewarpOperator.java @@ -100,7 +100,6 @@ public class TimewarpOperator implements PostProcessingOperator if (input instanceof Result) { Result res = (Result) input; Object value = res.getValue(); - final DateTime timestamp = res.getTimestamp().minus(offset); if (value instanceof TimeBoundaryResultValue) { TimeBoundaryResultValue boundary = (TimeBoundaryResultValue) value; @@ -112,12 +111,12 @@ public class TimewarpOperator implements PostProcessingOperator final DateTime maxTime = boundary.getMaxTime(); return (T) ((TimeBoundaryQuery) query).buildResult( - timestamp, + new DateTime(Math.min(res.getTimestamp().getMillis() - offset, now)), minTime != null ? minTime.minus(offset) : null, - maxTime != null ? maxTime.minus(offset) : null + maxTime != null ? new DateTime(Math.min(maxTime.getMillis() - offset, now)) : null ).iterator().next(); } - return (T) new Result(timestamp, value); + return (T) new Result(res.getTimestamp().minus(offset), value); } else if (input instanceof MapBasedRow) { MapBasedRow row = (MapBasedRow) input; return (T) new MapBasedRow(row.getTimestamp().minus(offset), row.getEvent()); diff --git a/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java b/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java index 4dfb8bbe26c..f1703e8099c 100644 --- a/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java +++ b/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java @@ -26,6 +26,7 @@ import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.query.timeboundary.TimeBoundaryResultValue; import io.druid.query.timeseries.TimeseriesResultValue; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -122,5 +123,46 @@ public class TimewarpOperatorTest Sequences.toList(queryRunner.run(query), Lists.>newArrayList()) ); + + TimewarpOperator> timeBoundaryOperator = new TimewarpOperator<>( + new Interval(new DateTime("2014-01-01"), new DateTime("2014-01-15")), + new Period("P1W"), + new DateTime("2014-01-06") // align on Monday + ); + + QueryRunner> timeBoundaryRunner = timeBoundaryOperator.postProcess( + new QueryRunner>() + { + @Override + public Sequence> run(Query> query) + { + return Sequences.simple( + ImmutableList.of( + new Result<>( + new DateTime("2014-01-12"), + new TimeBoundaryResultValue(ImmutableMap.of("maxTime", new DateTime("2014-01-12"))) + ) + ) + ); + } + }, + new DateTime("2014-08-02").getMillis() + ); + + final Query> timeBoundaryQuery = + Druids.newTimeBoundaryQueryBuilder() + .dataSource("dummy") + .build(); + + Assert.assertEquals( + Lists.newArrayList( + new Result<>( + new DateTime("2014-08-02"), + new TimeBoundaryResultValue(ImmutableMap.of("maxTime", new DateTime("2014-08-02"))) + ) + ), + Sequences.toList(timeBoundaryRunner.run(timeBoundaryQuery), Lists.>newArrayList()) + ); + } }