From 6827e48bd348e6628c126eec05566556b79488a4 Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Tue, 11 Dec 2012 15:09:16 -0800 Subject: [PATCH 1/6] bug fixes for invalid query params --- .../druid/query/timeboundary/TimeBoundaryQuery.java | 9 ++++++++- .../aggregation/post/FieldAccessPostAggregator.java | 7 ++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/client/src/main/java/com/metamx/druid/query/timeboundary/TimeBoundaryQuery.java b/client/src/main/java/com/metamx/druid/query/timeboundary/TimeBoundaryQuery.java index b9b64fe6948..541d5342c73 100644 --- a/client/src/main/java/com/metamx/druid/query/timeboundary/TimeBoundaryQuery.java +++ b/client/src/main/java/com/metamx/druid/query/timeboundary/TimeBoundaryQuery.java @@ -42,7 +42,10 @@ import java.util.Map; */ public class TimeBoundaryQuery extends BaseQuery> { - public static final Interval MY_Y2K_INTERVAL = new Interval(new DateTime(Long.MIN_VALUE), new DateTime(Long.MAX_VALUE)); + public static final Interval MY_Y2K_INTERVAL = new Interval( + new DateTime(Long.MIN_VALUE), + new DateTime(Long.MAX_VALUE) + ); public static final String MAX_TIME = "maxTime"; public static final String MIN_TIME = "minTime"; private static final byte CACHE_TYPE_ID = 0x0; @@ -131,6 +134,10 @@ public class TimeBoundaryQuery extends BaseQuery public Iterable> mergeResults(List> results) { + if (results == null || results.isEmpty()) { + return Lists.newArrayList(); + } + DateTime min = new DateTime(Long.MAX_VALUE); DateTime max = new DateTime(Long.MIN_VALUE); for (Result result : results) { diff --git a/common/src/main/java/com/metamx/druid/aggregation/post/FieldAccessPostAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/post/FieldAccessPostAggregator.java index 1c3f328fa87..42c031db01c 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/post/FieldAccessPostAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/post/FieldAccessPostAggregator.java @@ -19,6 +19,7 @@ package com.metamx.druid.aggregation.post; +import com.metamx.common.ISE; import org.codehaus.jackson.annotate.JsonCreator; import org.codehaus.jackson.annotate.JsonProperty; @@ -51,7 +52,11 @@ public class FieldAccessPostAggregator implements PostAggregator @Override public Object compute(Map combinedAggregators) { - return combinedAggregators.get(fieldName); + Object retVal = combinedAggregators.get(fieldName); + if (retVal == null) { + throw new ISE("Mismatch! Agg[%s] was not specified!", fieldName); + } + return retVal; } @Override From 143de953fcf8b8fb38d734cb6dc2d6cc198bcf21 Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Tue, 11 Dec 2012 17:10:27 -0800 Subject: [PATCH 2/6] bug fix for balancing --- .../java/com/metamx/druid/master/DruidMasterBalancer.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java index 75d3534fef6..41b0ae5379c 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java @@ -105,8 +105,8 @@ public class DruidMasterBalancer implements DruidMasterHelper continue; } - TreeSet serversByPercentUsed = Sets.newTreeSet(percentUsedComparator); - serversByPercentUsed.addAll(entry.getValue()); + MinMaxPriorityQueue serversByPercentUsed = MinMaxPriorityQueue.orderedBy(percentUsedComparator) + .create(entry.getValue()); if (serversByPercentUsed.size() <= 1) { log.info( @@ -116,8 +116,8 @@ public class DruidMasterBalancer implements DruidMasterHelper continue; } - ServerHolder highestPercentUsedServer = serversByPercentUsed.first(); - ServerHolder lowestPercentUsedServer = serversByPercentUsed.last(); + ServerHolder highestPercentUsedServer = serversByPercentUsed.peekFirst(); + ServerHolder lowestPercentUsedServer = serversByPercentUsed.peekLast(); analyzer.init(highestPercentUsedServer, lowestPercentUsedServer); From 127b632421a5d8c3d069593576916154fefd6f12 Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Tue, 11 Dec 2012 17:20:23 -0800 Subject: [PATCH 3/6] revert last commit --- .../java/com/metamx/druid/master/DruidMasterBalancer.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java index 41b0ae5379c..75d3534fef6 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java @@ -105,8 +105,8 @@ public class DruidMasterBalancer implements DruidMasterHelper continue; } - MinMaxPriorityQueue serversByPercentUsed = MinMaxPriorityQueue.orderedBy(percentUsedComparator) - .create(entry.getValue()); + TreeSet serversByPercentUsed = Sets.newTreeSet(percentUsedComparator); + serversByPercentUsed.addAll(entry.getValue()); if (serversByPercentUsed.size() <= 1) { log.info( @@ -116,8 +116,8 @@ public class DruidMasterBalancer implements DruidMasterHelper continue; } - ServerHolder highestPercentUsedServer = serversByPercentUsed.peekFirst(); - ServerHolder lowestPercentUsedServer = serversByPercentUsed.peekLast(); + ServerHolder highestPercentUsedServer = serversByPercentUsed.first(); + ServerHolder lowestPercentUsedServer = serversByPercentUsed.last(); analyzer.init(highestPercentUsedServer, lowestPercentUsedServer); From 08eb3390c88660ae14def853896ca648cb114b2a Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Wed, 12 Dec 2012 09:30:48 -0800 Subject: [PATCH 4/6] fix for caching bug with empty byte arrays --- .../java/com/metamx/druid/client/CachingClusteredClient.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java index c49e095d2ad..47376dd2e68 100644 --- a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java +++ b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java @@ -24,6 +24,7 @@ import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; @@ -237,6 +238,10 @@ public class CachingClusteredClient implements QueryRunner public Iterator make() { try { + if (cachedResult.length == 0) { + return Iterators.emptyIterator(); + } + return objectMapper.readValues( objectMapper.getJsonFactory().createJsonParser(cachedResult), Object.class ); From 436828b67cc53ad9382314c4e74f3a7dea572975 Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Wed, 12 Dec 2012 10:36:46 -0800 Subject: [PATCH 5/6] post aggs verify that they have all the fields they need --- .../java/com/metamx/druid/query/Queries.java | 50 +++++ .../query/timeseries/TimeseriesQuery.java | 4 +- .../com/metamx/druid/query/QueriesTest.java | 191 ++++++++++++++++++ .../post/ArithmeticPostAggregator.java | 11 + .../post/ConstantPostAggregator.java | 7 + .../post/FieldAccessPostAggregator.java | 13 +- .../aggregation/post/PostAggregator.java | 3 + 7 files changed, 272 insertions(+), 7 deletions(-) create mode 100644 client/src/main/java/com/metamx/druid/query/Queries.java create mode 100644 client/src/test/java/com/metamx/druid/query/QueriesTest.java diff --git a/client/src/main/java/com/metamx/druid/query/Queries.java b/client/src/main/java/com/metamx/druid/query/Queries.java new file mode 100644 index 00000000000..2fb17b2fe76 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/query/Queries.java @@ -0,0 +1,50 @@ +package com.metamx.druid.query; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.metamx.druid.aggregation.AggregatorFactory; +import com.metamx.druid.aggregation.post.PostAggregator; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.Set; + +/** + */ +public class Queries +{ + public static void verifyAggregations( + List aggFactories, + List postAggs + ) + { + Preconditions.checkNotNull(aggFactories, "aggregations cannot be null"); + Preconditions.checkArgument(aggFactories.size() > 0, "Must have at least one AggregatorFactory"); + + if (postAggs != null && !postAggs.isEmpty()) { + Set combinedAggNames = Sets.newHashSet( + Lists.transform( + aggFactories, + new Function() + { + @Override + public String apply(@Nullable AggregatorFactory input) + { + return input.getName(); + } + } + ) + ); + + for (PostAggregator postAgg : postAggs) { + Preconditions.checkArgument( + postAgg.verifyFields(combinedAggNames), + String.format("Missing field[%s]", postAgg.getName()) + ); + combinedAggNames.add(postAgg.getName()); + } + } + } +} diff --git a/client/src/main/java/com/metamx/druid/query/timeseries/TimeseriesQuery.java b/client/src/main/java/com/metamx/druid/query/timeseries/TimeseriesQuery.java index 3c664402df3..490e2531edd 100644 --- a/client/src/main/java/com/metamx/druid/query/timeseries/TimeseriesQuery.java +++ b/client/src/main/java/com/metamx/druid/query/timeseries/TimeseriesQuery.java @@ -26,6 +26,7 @@ import com.metamx.druid.Query; import com.metamx.druid.QueryGranularity; import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.aggregation.post.PostAggregator; +import com.metamx.druid.query.Queries; import com.metamx.druid.query.filter.DimFilter; import com.metamx.druid.query.segment.QuerySegmentSpec; import com.metamx.druid.result.Result; @@ -67,8 +68,7 @@ public class TimeseriesQuery extends BaseQuery> this.aggregatorSpecs = aggregatorSpecs; this.postAggregatorSpecs = postAggregatorSpecs == null ? ImmutableList.of() : postAggregatorSpecs; - Preconditions.checkNotNull(aggregatorSpecs, "aggregations cannot be null"); - Preconditions.checkArgument(aggregatorSpecs.size() > 0, "Must have at least one AggregatorFactory"); + Queries.verifyAggregations(this.aggregatorSpecs, this.postAggregatorSpecs); } @Override diff --git a/client/src/test/java/com/metamx/druid/query/QueriesTest.java b/client/src/test/java/com/metamx/druid/query/QueriesTest.java new file mode 100644 index 00000000000..46af9682826 --- /dev/null +++ b/client/src/test/java/com/metamx/druid/query/QueriesTest.java @@ -0,0 +1,191 @@ +package com.metamx.druid.query; + +import com.metamx.druid.aggregation.AggregatorFactory; +import com.metamx.druid.aggregation.CountAggregatorFactory; +import com.metamx.druid.aggregation.DoubleSumAggregatorFactory; +import com.metamx.druid.aggregation.post.ArithmeticPostAggregator; +import com.metamx.druid.aggregation.post.ConstantPostAggregator; +import com.metamx.druid.aggregation.post.FieldAccessPostAggregator; +import com.metamx.druid.aggregation.post.PostAggregator; +import junit.framework.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +/** + */ +public class QueriesTest +{ + @Test + public void testVerifyAggregations() throws Exception + { + List aggFactories = Arrays.asList( + new CountAggregatorFactory("count"), + new DoubleSumAggregatorFactory("idx", "index"), + new DoubleSumAggregatorFactory("rev", "revenue") + ); + + List postAggs = Arrays.asList( + new ArithmeticPostAggregator( + "addStuff", + "+", + Arrays.asList( + new FieldAccessPostAggregator("idx", "idx"), + new FieldAccessPostAggregator("count", "count") + ) + ) + ); + + boolean exceptionOccured = false; + + try { + Queries.verifyAggregations(aggFactories, postAggs); + } + catch (Exception e) { + exceptionOccured = true; + } + + Assert.assertFalse(exceptionOccured); + } + + @Test + public void testVerifyAggregationsMissingVal() throws Exception + { + List aggFactories = Arrays.asList( + new CountAggregatorFactory("count"), + new DoubleSumAggregatorFactory("idx", "index"), + new DoubleSumAggregatorFactory("rev", "revenue") + ); + + List postAggs = Arrays.asList( + new ArithmeticPostAggregator( + "addStuff", + "+", + Arrays.asList( + new FieldAccessPostAggregator("idx", "idx2"), + new FieldAccessPostAggregator("count", "count") + ) + ) + ); + + boolean exceptionOccured = false; + + try { + Queries.verifyAggregations(aggFactories, postAggs); + } + catch (Exception e) { + exceptionOccured = true; + } + + Assert.assertTrue(exceptionOccured); + } + + @Test + public void testVerifyAggregationsMultiLevel() throws Exception + { + List aggFactories = Arrays.asList( + new CountAggregatorFactory("count"), + new DoubleSumAggregatorFactory("idx", "index"), + new DoubleSumAggregatorFactory("rev", "revenue") + ); + + List postAggs = Arrays.asList( + new ArithmeticPostAggregator( + "divideStuff", + "/", + Arrays.asList( + new ArithmeticPostAggregator( + "addStuff", + "+", + Arrays.asList( + new FieldAccessPostAggregator("idx", "idx"), + new ConstantPostAggregator("const", 1) + ) + ), + new ArithmeticPostAggregator( + "subtractStuff", + "-", + Arrays.asList( + new FieldAccessPostAggregator("rev", "rev"), + new ConstantPostAggregator("const", 1) + ) + ) + ) + ), + new ArithmeticPostAggregator( + "addStuff", + "+", + Arrays.asList( + new FieldAccessPostAggregator("divideStuff", "divideStuff"), + new FieldAccessPostAggregator("count", "count") + ) + ) + ); + + boolean exceptionOccured = false; + + try { + Queries.verifyAggregations(aggFactories, postAggs); + } + catch (Exception e) { + exceptionOccured = true; + } + + Assert.assertFalse(exceptionOccured); + } + + @Test + public void testVerifyAggregationsMultiLevelMissingVal() throws Exception + { + List aggFactories = Arrays.asList( + new CountAggregatorFactory("count"), + new DoubleSumAggregatorFactory("idx", "index"), + new DoubleSumAggregatorFactory("rev", "revenue") + ); + + List postAggs = Arrays.asList( + new ArithmeticPostAggregator( + "divideStuff", + "/", + Arrays.asList( + new ArithmeticPostAggregator( + "addStuff", + "+", + Arrays.asList( + new FieldAccessPostAggregator("idx", "idx"), + new ConstantPostAggregator("const", 1) + ) + ), + new ArithmeticPostAggregator( + "subtractStuff", + "-", + Arrays.asList( + new FieldAccessPostAggregator("rev", "rev2"), + new ConstantPostAggregator("const", 1) + ) + ) + ) + ), + new ArithmeticPostAggregator( + "addStuff", + "+", + Arrays.asList( + new FieldAccessPostAggregator("divideStuff", "divideStuff"), + new FieldAccessPostAggregator("count", "count") + ) + ) + ); + + boolean exceptionOccured = false; + + try { + Queries.verifyAggregations(aggFactories, postAggs); + } + catch (Exception e) { + exceptionOccured = true; + } + + Assert.assertTrue(exceptionOccured); + } +} diff --git a/common/src/main/java/com/metamx/druid/aggregation/post/ArithmeticPostAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/post/ArithmeticPostAggregator.java index bffc0f2d7bb..035d0fa6652 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/post/ArithmeticPostAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/post/ArithmeticPostAggregator.java @@ -68,6 +68,17 @@ public class ArithmeticPostAggregator implements PostAggregator } } + @Override + public boolean verifyFields(Set fieldNames) + { + for (PostAggregator field : fields) { + if (!field.verifyFields(fieldNames)) { + return false; + } + } + return true; + } + @Override public Comparator getComparator() { diff --git a/common/src/main/java/com/metamx/druid/aggregation/post/ConstantPostAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/post/ConstantPostAggregator.java index eef818db49f..645de6c9b80 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/post/ConstantPostAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/post/ConstantPostAggregator.java @@ -24,6 +24,7 @@ import org.codehaus.jackson.annotate.JsonProperty; import java.util.Comparator; import java.util.Map; +import java.util.Set; /** */ @@ -42,6 +43,12 @@ public class ConstantPostAggregator implements PostAggregator this.constantValue = constantValue; } + @Override + public boolean verifyFields(Set fields) + { + return true; + } + @Override public Comparator getComparator() { diff --git a/common/src/main/java/com/metamx/druid/aggregation/post/FieldAccessPostAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/post/FieldAccessPostAggregator.java index 42c031db01c..0a1c866d044 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/post/FieldAccessPostAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/post/FieldAccessPostAggregator.java @@ -25,6 +25,7 @@ import org.codehaus.jackson.annotate.JsonProperty; import java.util.Comparator; import java.util.Map; +import java.util.Set; /** */ @@ -43,6 +44,12 @@ public class FieldAccessPostAggregator implements PostAggregator this.fieldName = fieldName; } + @Override + public boolean verifyFields(Set fieldNames) + { + return fieldNames.contains(fieldName); + } + @Override public Comparator getComparator() { @@ -52,11 +59,7 @@ public class FieldAccessPostAggregator implements PostAggregator @Override public Object compute(Map combinedAggregators) { - Object retVal = combinedAggregators.get(fieldName); - if (retVal == null) { - throw new ISE("Mismatch! Agg[%s] was not specified!", fieldName); - } - return retVal; + return combinedAggregators.get(fieldName); } @Override diff --git a/common/src/main/java/com/metamx/druid/aggregation/post/PostAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/post/PostAggregator.java index 5e679d64868..487ac30efb3 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/post/PostAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/post/PostAggregator.java @@ -24,6 +24,7 @@ import org.codehaus.jackson.annotate.JsonTypeInfo; import java.util.Comparator; import java.util.Map; +import java.util.Set; /** * Functionally similar to an Aggregator. See the Aggregator interface for more comments. @@ -36,6 +37,8 @@ import java.util.Map; }) public interface PostAggregator { + public boolean verifyFields(Set fieldNames); + public Comparator getComparator(); public Object compute(Map combinedAggregators); From 6db99c45a673dea5ade57b78ac37595b242d96fb Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Wed, 12 Dec 2012 14:51:19 -0800 Subject: [PATCH 6/6] more specific exceptions in test --- .../src/test/java/com/metamx/druid/query/QueriesTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/client/src/test/java/com/metamx/druid/query/QueriesTest.java b/client/src/test/java/com/metamx/druid/query/QueriesTest.java index 46af9682826..7463efdc58b 100644 --- a/client/src/test/java/com/metamx/druid/query/QueriesTest.java +++ b/client/src/test/java/com/metamx/druid/query/QueriesTest.java @@ -42,7 +42,7 @@ public class QueriesTest try { Queries.verifyAggregations(aggFactories, postAggs); } - catch (Exception e) { + catch (IllegalArgumentException e) { exceptionOccured = true; } @@ -74,7 +74,7 @@ public class QueriesTest try { Queries.verifyAggregations(aggFactories, postAggs); } - catch (Exception e) { + catch (IllegalArgumentException e) { exceptionOccured = true; } @@ -128,7 +128,7 @@ public class QueriesTest try { Queries.verifyAggregations(aggFactories, postAggs); } - catch (Exception e) { + catch (IllegalArgumentException e) { exceptionOccured = true; } @@ -182,7 +182,7 @@ public class QueriesTest try { Queries.verifyAggregations(aggFactories, postAggs); } - catch (Exception e) { + catch (IllegalArgumentException e) { exceptionOccured = true; }