From d4a47fe6e8f86d3bceb825935960ffaa38c92de5 Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 16 Jun 2014 13:45:32 -0700 Subject: [PATCH 1/5] enable tb to return just max or min time --- .../src/main/java/io/druid/query/Druids.java | 10 +++ .../query/timeboundary/TimeBoundaryQuery.java | 69 ++++++++++++++----- .../TimeBoundaryQueryQueryToolChest.java | 52 ++++++++++---- .../timeboundary/TimeBoundaryResultValue.java | 4 ++ .../TimeBoundaryQueryRunnerTest.java | 42 +++++++++++ 5 files changed, 145 insertions(+), 32 deletions(-) diff --git a/processing/src/main/java/io/druid/query/Druids.java b/processing/src/main/java/io/druid/query/Druids.java index 084652b322f..4f94b8caf40 100644 --- a/processing/src/main/java/io/druid/query/Druids.java +++ b/processing/src/main/java/io/druid/query/Druids.java @@ -692,12 +692,14 @@ public class Druids { private DataSource dataSource; private QuerySegmentSpec querySegmentSpec; + private String exclude; private Map context; public TimeBoundaryQueryBuilder() { dataSource = null; querySegmentSpec = null; + exclude = null; context = null; } @@ -706,6 +708,7 @@ public class Druids return new TimeBoundaryQuery( dataSource, querySegmentSpec, + exclude, context ); } @@ -715,6 +718,7 @@ public class Druids return new TimeBoundaryQueryBuilder() .dataSource(builder.dataSource) .intervals(builder.querySegmentSpec) + .exclude(builder.exclude) .context(builder.context); } @@ -748,6 +752,12 @@ public class Druids return this; } + public TimeBoundaryQueryBuilder exclude(String ex) + { + exclude = ex; + return this; + } + public TimeBoundaryQueryBuilder context(Map c) { context = c; diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java index 6f3e70b9851..3ed1f17a604 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java @@ -50,10 +50,13 @@ public class TimeBoundaryQuery extends BaseQuery public static final String MIN_TIME = "minTime"; private static final byte CACHE_TYPE_ID = 0x0; + private final String exclude; + @JsonCreator public TimeBoundaryQuery( @JsonProperty("dataSource") DataSource dataSource, @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec, + @JsonProperty("exclude") String exclude, @JsonProperty("context") Map context ) { @@ -63,6 +66,8 @@ public class TimeBoundaryQuery extends BaseQuery : querySegmentSpec, context ); + + this.exclude = exclude == null ? "" : exclude; } @Override @@ -77,12 +82,19 @@ public class TimeBoundaryQuery extends BaseQuery return Query.TIME_BOUNDARY; } + @JsonProperty + public String getExclude() + { + return exclude; + } + @Override public TimeBoundaryQuery withOverriddenContext(Map contextOverrides) { return new TimeBoundaryQuery( getDataSource(), getQuerySegmentSpec(), + exclude, computeOverridenContext(contextOverrides) ); } @@ -93,6 +105,7 @@ public class TimeBoundaryQuery extends BaseQuery return new TimeBoundaryQuery( getDataSource(), spec, + exclude, getContext() ); } @@ -103,14 +116,17 @@ public class TimeBoundaryQuery extends BaseQuery return new TimeBoundaryQuery( dataSource, getQuerySegmentSpec(), + exclude, getContext() ); } public byte[] getCacheKey() { - return ByteBuffer.allocate(1) + final byte[] excludeBytes = exclude.getBytes(); + return ByteBuffer.allocate(1 + excludeBytes.length) .put(CACHE_TYPE_ID) + .put(excludeBytes) .array(); } @@ -121,6 +137,7 @@ public class TimeBoundaryQuery extends BaseQuery "dataSource='" + getDataSource() + '\'' + ", querySegmentSpec=" + getQuerySegmentSpec() + ", duration=" + getDuration() + + ", exclude" + exclude + '}'; } @@ -129,14 +146,14 @@ public class TimeBoundaryQuery extends BaseQuery List> results = Lists.newArrayList(); Map result = Maps.newHashMap(); - if (min != null) { - result.put(TimeBoundaryQuery.MIN_TIME, min); + if (min != null && !exclude.equalsIgnoreCase(MIN_TIME)) { + result.put(MIN_TIME, min); } - if (max != null) { - result.put(TimeBoundaryQuery.MAX_TIME, max); + if (max != null && !exclude.equalsIgnoreCase(MAX_TIME)) { + result.put(MAX_TIME, max); } if (!result.isEmpty()) { - results.add(new Result(timestamp, new TimeBoundaryResultValue(result))); + results.add(new Result<>(timestamp, new TimeBoundaryResultValue(result))); } return results; @@ -153,24 +170,40 @@ public class TimeBoundaryQuery extends BaseQuery for (Result result : results) { TimeBoundaryResultValue val = result.getValue(); - DateTime currMinTime = val.getMinTime(); - if (currMinTime.isBefore(min)) { - min = currMinTime; + if (!exclude.equalsIgnoreCase(MIN_TIME)) { + DateTime currMinTime = val.getMinTime(); + if (currMinTime.isBefore(min)) { + min = currMinTime; + } } - DateTime currMaxTime = val.getMaxTime(); - if (currMaxTime.isAfter(max)) { - max = currMaxTime; + if (!exclude.equalsIgnoreCase(MAX_TIME)) { + DateTime currMaxTime = val.getMaxTime(); + if (currMaxTime.isAfter(max)) { + max = currMaxTime; + } } } + final ImmutableMap.Builder builder = new ImmutableMap.Builder<>(); + final DateTime ts; + + if (exclude.equalsIgnoreCase(MIN_TIME)) { + ts = max; + builder.put(MAX_TIME, max); + } else if (exclude.equalsIgnoreCase(MAX_TIME)) { + ts = min; + builder.put(MIN_TIME, min); + } else { + ts = min; + builder.put(MAX_TIME, max); + builder.put(MIN_TIME, min); + } + return Arrays.asList( - new Result( - min, + new Result<>( + ts, new TimeBoundaryResultValue( - ImmutableMap.of( - TimeBoundaryQuery.MIN_TIME, min, - TimeBoundaryQuery.MAX_TIME, max - ) + builder.build() ) ) ); diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index cc4aaa46579..6a765db9ed3 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -67,21 +67,45 @@ public class TimeBoundaryQueryQueryToolChest return segments; } - final T first = segments.get(0); - final T second = segments.get(segments.size() - 1); + final T min = segments.get(0); + final T max = segments.get(segments.size() - 1); + + final Predicate filterPredicate; + // optimizations to avoid hitting too many segments + if (query.getExclude().equalsIgnoreCase(TimeBoundaryQuery.MAX_TIME)) { + filterPredicate = new Predicate() + { + @Override + public boolean apply(T input) + { + return input.getInterval().overlaps(min.getInterval()); + } + }; + } else if (query.getExclude().equalsIgnoreCase(TimeBoundaryQuery.MIN_TIME)) { + filterPredicate = new Predicate() + { + @Override + public boolean apply(T input) + { + return input.getInterval().overlaps(max.getInterval()); + } + }; + } else { + filterPredicate = new Predicate() + { + @Override + public boolean apply(T input) + { + return input.getInterval().overlaps(min.getInterval()) || input.getInterval() + .overlaps(max.getInterval()); + } + }; + } return Lists.newArrayList( Iterables.filter( segments, - new Predicate() - { - @Override - public boolean apply(T input) - { - return input.getInterval().overlaps(first.getInterval()) || input.getInterval() - .overlaps(second.getInterval()); - } - } + filterPredicate ) ); } @@ -146,9 +170,9 @@ public class TimeBoundaryQueryQueryToolChest public byte[] computeCacheKey(TimeBoundaryQuery query) { return ByteBuffer.allocate(2) - .put(TIMEBOUNDARY_QUERY) - .put(query.getCacheKey()) - .array(); + .put(TIMEBOUNDARY_QUERY) + .put(query.getCacheKey()) + .array(); } @Override diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryResultValue.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryResultValue.java index 1210e95af2c..ba5777879e6 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryResultValue.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryResultValue.java @@ -99,6 +99,10 @@ public class TimeBoundaryResultValue private DateTime getDateTimeValue(Object val) { + if (val == null) { + return null; + } + if (val instanceof DateTime) { return (DateTime) val; } else if (val instanceof String) { diff --git a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java index 7bc499dca80..470b80cf3b5 100644 --- a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java @@ -78,4 +78,46 @@ public class TimeBoundaryQueryRunnerTest Assert.assertEquals(new DateTime("2011-01-12T00:00:00.000Z"), minTime); Assert.assertEquals(new DateTime("2011-04-15T00:00:00.000Z"), maxTime); } + + @Test + @SuppressWarnings("unchecked") + public void testTimeBoundaryExcludesMin() + { + TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder() + .dataSource("testing") + .exclude(TimeBoundaryQuery.MIN_TIME) + .build(); + + Iterable> results = Sequences.toList( + runner.run(timeBoundaryQuery), + Lists.>newArrayList() + ); + TimeBoundaryResultValue val = results.iterator().next().getValue(); + DateTime minTime = val.getMinTime(); + DateTime maxTime = val.getMaxTime(); + + Assert.assertNull(minTime); + Assert.assertEquals(new DateTime("2011-04-15T00:00:00.000Z"), maxTime); + } + + @Test + @SuppressWarnings("unchecked") + public void testTimeBoundaryExcludesMax() + { + TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder() + .dataSource("testing") + .exclude(TimeBoundaryQuery.MAX_TIME) + .build(); + + Iterable> results = Sequences.toList( + runner.run(timeBoundaryQuery), + Lists.>newArrayList() + ); + TimeBoundaryResultValue val = results.iterator().next().getValue(); + DateTime minTime = val.getMinTime(); + DateTime maxTime = val.getMaxTime(); + + Assert.assertEquals(new DateTime("2011-01-12T00:00:00.000Z"), minTime); + Assert.assertNull(maxTime); + } } From d2cf7d3f0a4010c6bdfea0f19ece0948ff30ffdd Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 16 Jun 2014 14:00:01 -0700 Subject: [PATCH 2/5] address cr --- .../src/main/java/io/druid/query/Druids.java | 12 +++--- .../query/timeboundary/TimeBoundaryQuery.java | 40 +++++++++---------- .../TimeBoundaryQueryQueryToolChest.java | 4 +- .../TimeBoundaryQueryRunnerTest.java | 11 ++--- 4 files changed, 32 insertions(+), 35 deletions(-) diff --git a/processing/src/main/java/io/druid/query/Druids.java b/processing/src/main/java/io/druid/query/Druids.java index 4f94b8caf40..932af432ad1 100644 --- a/processing/src/main/java/io/druid/query/Druids.java +++ b/processing/src/main/java/io/druid/query/Druids.java @@ -692,14 +692,14 @@ public class Druids { private DataSource dataSource; private QuerySegmentSpec querySegmentSpec; - private String exclude; + private String bound; private Map context; public TimeBoundaryQueryBuilder() { dataSource = null; querySegmentSpec = null; - exclude = null; + bound = null; context = null; } @@ -708,7 +708,7 @@ public class Druids return new TimeBoundaryQuery( dataSource, querySegmentSpec, - exclude, + bound, context ); } @@ -718,7 +718,7 @@ public class Druids return new TimeBoundaryQueryBuilder() .dataSource(builder.dataSource) .intervals(builder.querySegmentSpec) - .exclude(builder.exclude) + .bound(builder.bound) .context(builder.context); } @@ -752,9 +752,9 @@ public class Druids return this; } - public TimeBoundaryQueryBuilder exclude(String ex) + public TimeBoundaryQueryBuilder bound(String b) { - exclude = ex; + bound = b; return this; } diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java index 3ed1f17a604..eecc60251a0 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java @@ -50,13 +50,13 @@ public class TimeBoundaryQuery extends BaseQuery public static final String MIN_TIME = "minTime"; private static final byte CACHE_TYPE_ID = 0x0; - private final String exclude; + private final String bound; @JsonCreator public TimeBoundaryQuery( @JsonProperty("dataSource") DataSource dataSource, @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec, - @JsonProperty("exclude") String exclude, + @JsonProperty("bound") String bound, @JsonProperty("context") Map context ) { @@ -67,7 +67,7 @@ public class TimeBoundaryQuery extends BaseQuery context ); - this.exclude = exclude == null ? "" : exclude; + this.bound = bound == null ? "" : bound; } @Override @@ -83,9 +83,9 @@ public class TimeBoundaryQuery extends BaseQuery } @JsonProperty - public String getExclude() + public String getBound() { - return exclude; + return bound; } @Override @@ -94,7 +94,7 @@ public class TimeBoundaryQuery extends BaseQuery return new TimeBoundaryQuery( getDataSource(), getQuerySegmentSpec(), - exclude, + bound, computeOverridenContext(contextOverrides) ); } @@ -105,7 +105,7 @@ public class TimeBoundaryQuery extends BaseQuery return new TimeBoundaryQuery( getDataSource(), spec, - exclude, + bound, getContext() ); } @@ -116,14 +116,14 @@ public class TimeBoundaryQuery extends BaseQuery return new TimeBoundaryQuery( dataSource, getQuerySegmentSpec(), - exclude, + bound, getContext() ); } public byte[] getCacheKey() { - final byte[] excludeBytes = exclude.getBytes(); + final byte[] excludeBytes = bound.getBytes(); return ByteBuffer.allocate(1 + excludeBytes.length) .put(CACHE_TYPE_ID) .put(excludeBytes) @@ -137,7 +137,7 @@ public class TimeBoundaryQuery extends BaseQuery "dataSource='" + getDataSource() + '\'' + ", querySegmentSpec=" + getQuerySegmentSpec() + ", duration=" + getDuration() + - ", exclude" + exclude + + ", bound" + bound + '}'; } @@ -146,10 +146,10 @@ public class TimeBoundaryQuery extends BaseQuery List> results = Lists.newArrayList(); Map result = Maps.newHashMap(); - if (min != null && !exclude.equalsIgnoreCase(MIN_TIME)) { + if (min != null && !bound.equalsIgnoreCase(MAX_TIME)) { result.put(MIN_TIME, min); } - if (max != null && !exclude.equalsIgnoreCase(MAX_TIME)) { + if (max != null && !bound.equalsIgnoreCase(MIN_TIME)) { result.put(MAX_TIME, max); } if (!result.isEmpty()) { @@ -170,13 +170,13 @@ public class TimeBoundaryQuery extends BaseQuery for (Result result : results) { TimeBoundaryResultValue val = result.getValue(); - if (!exclude.equalsIgnoreCase(MIN_TIME)) { + if (!bound.equalsIgnoreCase(MAX_TIME)) { DateTime currMinTime = val.getMinTime(); if (currMinTime.isBefore(min)) { min = currMinTime; } } - if (!exclude.equalsIgnoreCase(MAX_TIME)) { + if (!bound.equalsIgnoreCase(MIN_TIME)) { DateTime currMaxTime = val.getMaxTime(); if (currMaxTime.isAfter(max)) { max = currMaxTime; @@ -187,16 +187,16 @@ public class TimeBoundaryQuery extends BaseQuery final ImmutableMap.Builder builder = new ImmutableMap.Builder<>(); final DateTime ts; - if (exclude.equalsIgnoreCase(MIN_TIME)) { - ts = max; - builder.put(MAX_TIME, max); - } else if (exclude.equalsIgnoreCase(MAX_TIME)) { + if (bound.equalsIgnoreCase(MIN_TIME)) { ts = min; - builder.put(MIN_TIME, min); + builder.put(MIN_TIME, max); + } else if (bound.equalsIgnoreCase(MAX_TIME)) { + ts = max; + builder.put(MAX_TIME, min); } else { ts = min; - builder.put(MAX_TIME, max); builder.put(MIN_TIME, min); + builder.put(MAX_TIME, max); } return Arrays.asList( diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index 6a765db9ed3..ec9da74eef8 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -72,7 +72,7 @@ public class TimeBoundaryQueryQueryToolChest final Predicate filterPredicate; // optimizations to avoid hitting too many segments - if (query.getExclude().equalsIgnoreCase(TimeBoundaryQuery.MAX_TIME)) { + if (query.getBound().equalsIgnoreCase(TimeBoundaryQuery.MIN_TIME)) { filterPredicate = new Predicate() { @Override @@ -81,7 +81,7 @@ public class TimeBoundaryQueryQueryToolChest return input.getInterval().overlaps(min.getInterval()); } }; - } else if (query.getExclude().equalsIgnoreCase(TimeBoundaryQuery.MIN_TIME)) { + } else if (query.getBound().equalsIgnoreCase(TimeBoundaryQuery.MAX_TIME)) { filterPredicate = new Predicate() { @Override diff --git a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java index 470b80cf3b5..32ff68ab350 100644 --- a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java @@ -20,13 +20,10 @@ package io.druid.query.timeboundary; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ListenableFuture; import com.metamx.common.guava.Sequences; import io.druid.query.Druids; -import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; -import io.druid.query.QueryWatcher; import io.druid.query.Result; import org.joda.time.DateTime; import org.junit.Assert; @@ -81,11 +78,11 @@ public class TimeBoundaryQueryRunnerTest @Test @SuppressWarnings("unchecked") - public void testTimeBoundaryExcludesMin() + public void testTimeBoundaryMax() { TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder() .dataSource("testing") - .exclude(TimeBoundaryQuery.MIN_TIME) + .bound(TimeBoundaryQuery.MAX_TIME) .build(); Iterable> results = Sequences.toList( @@ -102,11 +99,11 @@ public class TimeBoundaryQueryRunnerTest @Test @SuppressWarnings("unchecked") - public void testTimeBoundaryExcludesMax() + public void testTimeBoundaryMin() { TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder() .dataSource("testing") - .exclude(TimeBoundaryQuery.MAX_TIME) + .bound(TimeBoundaryQuery.MIN_TIME) .build(); Iterable> results = Sequences.toList( From 08c88e8fb7c1ff6fd329cefc9caa83ca38e795fb Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 16 Jun 2014 14:07:10 -0700 Subject: [PATCH 3/5] address cr --- .../io/druid/query/timeboundary/TimeBoundaryQuery.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java index eecc60251a0..636c373b98f 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java @@ -21,6 +21,7 @@ package io.druid.query.timeboundary; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Charsets; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -123,10 +124,10 @@ public class TimeBoundaryQuery extends BaseQuery public byte[] getCacheKey() { - final byte[] excludeBytes = bound.getBytes(); - return ByteBuffer.allocate(1 + excludeBytes.length) + final byte[] boundBytes = bound.getBytes(Charsets.UTF_8); + return ByteBuffer.allocate(1 + boundBytes.length) .put(CACHE_TYPE_ID) - .put(excludeBytes) + .put(boundBytes) .array(); } From 9ca4f564ee23ccea7611b15cb1d26b1d16bd3f9e Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 16 Jun 2014 14:20:16 -0700 Subject: [PATCH 4/5] address cr --- .../query/timeboundary/TimeBoundaryQuery.java | 20 +++----- .../TimeBoundaryQueryQueryToolChest.java | 50 +++++-------------- .../TimeBoundaryQueryRunnerFactory.java | 17 +++++-- 3 files changed, 34 insertions(+), 53 deletions(-) diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java index 636c373b98f..6d90a6a50c1 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java @@ -147,10 +147,10 @@ public class TimeBoundaryQuery extends BaseQuery List> results = Lists.newArrayList(); Map result = Maps.newHashMap(); - if (min != null && !bound.equalsIgnoreCase(MAX_TIME)) { + if (min != null) { result.put(MIN_TIME, min); } - if (max != null && !bound.equalsIgnoreCase(MIN_TIME)) { + if (max != null) { result.put(MAX_TIME, max); } if (!result.isEmpty()) { @@ -171,17 +171,13 @@ public class TimeBoundaryQuery extends BaseQuery for (Result result : results) { TimeBoundaryResultValue val = result.getValue(); - if (!bound.equalsIgnoreCase(MAX_TIME)) { - DateTime currMinTime = val.getMinTime(); - if (currMinTime.isBefore(min)) { - min = currMinTime; - } + DateTime currMinTime = val.getMinTime(); + if (currMinTime != null && currMinTime.isBefore(min)) { + min = currMinTime; } - if (!bound.equalsIgnoreCase(MIN_TIME)) { - DateTime currMaxTime = val.getMaxTime(); - if (currMaxTime.isAfter(max)) { - max = currMaxTime; - } + DateTime currMaxTime = val.getMaxTime(); + if (currMaxTime != null && currMaxTime.isAfter(max)) { + max = currMaxTime; } } diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index ec9da74eef8..fdde44b8581 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -70,42 +70,18 @@ public class TimeBoundaryQueryQueryToolChest final T min = segments.get(0); final T max = segments.get(segments.size() - 1); - final Predicate filterPredicate; - // optimizations to avoid hitting too many segments - if (query.getBound().equalsIgnoreCase(TimeBoundaryQuery.MIN_TIME)) { - filterPredicate = new Predicate() - { - @Override - public boolean apply(T input) - { - return input.getInterval().overlaps(min.getInterval()); - } - }; - } else if (query.getBound().equalsIgnoreCase(TimeBoundaryQuery.MAX_TIME)) { - filterPredicate = new Predicate() - { - @Override - public boolean apply(T input) - { - return input.getInterval().overlaps(max.getInterval()); - } - }; - } else { - filterPredicate = new Predicate() - { - @Override - public boolean apply(T input) - { - return input.getInterval().overlaps(min.getInterval()) || input.getInterval() - .overlaps(max.getInterval()); - } - }; - } - return Lists.newArrayList( Iterables.filter( segments, - filterPredicate + new Predicate() + { + @Override + public boolean apply(T input) + { + return (min != null && input.getInterval().overlaps(min.getInterval())) || + (max != null && input.getInterval().overlaps(max.getInterval())); + } + } ) ); } @@ -135,7 +111,7 @@ public class TimeBoundaryQueryQueryToolChest @Override public Sequence> mergeSequences(Sequence>> seqOfSequences) { - return new OrderedMergeSequence>(getOrdering(), seqOfSequences); + return new OrderedMergeSequence<>(getOrdering(), seqOfSequences); } @Override @@ -201,11 +177,11 @@ public class TimeBoundaryQueryQueryToolChest { @Override @SuppressWarnings("unchecked") - public Result apply(@Nullable Object input) + public Result apply(Object input) { List result = (List) input; - return new Result( + return new Result<>( new DateTime(result.get(0)), new TimeBoundaryResultValue(result.get(1)) ); @@ -216,7 +192,7 @@ public class TimeBoundaryQueryQueryToolChest @Override public Sequence> mergeSequences(Sequence>> seqOfSequences) { - return new MergeSequence>(getOrdering(), seqOfSequences); + return new MergeSequence<>(getOrdering(), seqOfSequences); } }; } diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java index 1f78429ead3..bdb17694346 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java @@ -32,6 +32,7 @@ import io.druid.query.QueryWatcher; import io.druid.query.Result; import io.druid.segment.Segment; import io.druid.segment.StorageAdapter; +import org.joda.time.DateTime; import java.util.Iterator; import java.util.concurrent.ExecutorService; @@ -61,7 +62,7 @@ public class TimeBoundaryQueryRunnerFactory ExecutorService queryExecutor, Iterable>> queryRunners ) { - return new ChainedExecutionQueryRunner>( + return new ChainedExecutionQueryRunner<>( queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners ); } @@ -90,7 +91,7 @@ public class TimeBoundaryQueryRunnerFactory final TimeBoundaryQuery legacyQuery = (TimeBoundaryQuery) input; - return new BaseSequence, Iterator>>( + return new BaseSequence<>( new BaseSequence.IteratorMaker, Iterator>>() { @Override @@ -102,10 +103,18 @@ public class TimeBoundaryQueryRunnerFactory ); } + final DateTime minTime = legacyQuery.getBound().equalsIgnoreCase(TimeBoundaryQuery.MAX_TIME) + ? null + : adapter.getMinTime(); + final DateTime maxTime = legacyQuery.getBound().equalsIgnoreCase(TimeBoundaryQuery.MIN_TIME) + ? null + : adapter.getMaxTime(); + + return legacyQuery.buildResult( adapter.getInterval().getStart(), - adapter.getMinTime(), - adapter.getMaxTime() + minTime, + maxTime ).iterator(); } From 5aa46d08a8d04ac5ea6a8f325cfa5026820d3d37 Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 16 Jun 2014 14:24:36 -0700 Subject: [PATCH 5/5] address cr --- .../query/timeboundary/TimeBoundaryQuery.java | 59 +++++++++---------- 1 file changed, 28 insertions(+), 31 deletions(-) diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java index 6d90a6a50c1..574d0704602 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java @@ -49,8 +49,27 @@ public class TimeBoundaryQuery extends BaseQuery ); public static final String MAX_TIME = "maxTime"; public static final String MIN_TIME = "minTime"; + private static final byte CACHE_TYPE_ID = 0x0; + public static Iterable> buildResult(DateTime timestamp, DateTime min, DateTime max) + { + List> results = Lists.newArrayList(); + Map result = Maps.newHashMap(); + + if (min != null) { + result.put(MIN_TIME, min); + } + if (max != null) { + result.put(MAX_TIME, max); + } + if (!result.isEmpty()) { + results.add(new Result<>(timestamp, new TimeBoundaryResultValue(result))); + } + + return results; + } + private final String bound; @JsonCreator @@ -142,24 +161,6 @@ public class TimeBoundaryQuery extends BaseQuery '}'; } - public Iterable> buildResult(DateTime timestamp, DateTime min, DateTime max) - { - List> results = Lists.newArrayList(); - Map result = Maps.newHashMap(); - - if (min != null) { - result.put(MIN_TIME, min); - } - if (max != null) { - result.put(MAX_TIME, max); - } - if (!result.isEmpty()) { - results.add(new Result<>(timestamp, new TimeBoundaryResultValue(result))); - } - - return results; - } - public Iterable> mergeResults(List> results) { if (results == null || results.isEmpty()) { @@ -181,28 +182,24 @@ public class TimeBoundaryQuery extends BaseQuery } } - final ImmutableMap.Builder builder = new ImmutableMap.Builder<>(); final DateTime ts; + final DateTime minTime; + final DateTime maxTime; if (bound.equalsIgnoreCase(MIN_TIME)) { ts = min; - builder.put(MIN_TIME, max); + minTime = min; + maxTime = null; } else if (bound.equalsIgnoreCase(MAX_TIME)) { ts = max; - builder.put(MAX_TIME, min); + minTime = null; + maxTime = max; } else { ts = min; - builder.put(MIN_TIME, min); - builder.put(MAX_TIME, max); + minTime = min; + maxTime = max; } - return Arrays.asList( - new Result<>( - ts, - new TimeBoundaryResultValue( - builder.build() - ) - ) - ); + return buildResult(ts, minTime, maxTime); } }