From c2137e8416a346e055b9b466cc37c28949ff108e Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 15 May 2014 19:34:22 -0700 Subject: [PATCH] ObjectColumnSelector/JS aggregator support for multi-valued columns. --- .../JavaScriptAggregatorFactory.java | 6 +- .../segment/QueryableIndexStorageAdapter.java | 111 ++++++++++++------ .../IncrementalIndexStorageAdapter.java | 18 +-- .../io/druid/query/QueryRunnerTestHelper.java | 8 ++ .../aggregation/JavaScriptAggregatorTest.java | 42 +++++++ .../aggregation/TestObjectColumnSelector.java | 53 +++++++++ .../timeseries/TimeseriesQueryRunnerTest.java | 38 ++++++ 7 files changed, 235 insertions(+), 41 deletions(-) create mode 100644 processing/src/test/java/io/druid/query/aggregation/TestObjectColumnSelector.java diff --git a/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java index 85bd9597ae8..f3724695016 100644 --- a/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java @@ -32,12 +32,15 @@ import org.mozilla.javascript.Context; import org.mozilla.javascript.ContextAction; import org.mozilla.javascript.ContextFactory; import org.mozilla.javascript.Function; +import org.mozilla.javascript.NativeArray; +import org.mozilla.javascript.Scriptable; import org.mozilla.javascript.ScriptableObject; import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; +import java.util.Arrays; import java.util.Comparator; import java.util.List; @@ -263,6 +266,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory Context cx = Context.getCurrentContext(); if (cx == null) { cx = contextFactory.enterContext(); + cx.getWrapFactory().setJavaPrimitiveWrap(false); } final int size = selectorList.length; @@ -272,7 +276,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory for (int i = 0 ; i < size ; i++) { final ObjectColumnSelector selector = selectorList[i]; if (selector != null) { - args[i + 1] = selector.get(); + args[i + 1] = Context.javaToJS(selector.get(), scope); } } diff --git a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java index 86637cf0d4a..b3178d83df7 100644 --- a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java @@ -386,12 +386,6 @@ public class QueryableIndexStorageAdapter implements StorageAdapter if (holder != null) { final ColumnCapabilities capabilities = holder.getCapabilities(); - if (capabilities.hasMultipleValues()) { - throw new UnsupportedOperationException( - "makeObjectColumnSelector does not support multivalued columns" - ); - } - if (capabilities.isDictionaryEncoded()) { cachedColumnVals = holder.getDictionaryEncoding(); } else if (capabilities.getType() == ValueType.COMPLEX) { @@ -414,6 +408,12 @@ public class QueryableIndexStorageAdapter implements StorageAdapter final GenericColumn columnVals = (GenericColumn) cachedColumnVals; final ValueType type = columnVals.getType(); + if (columnVals.hasMultipleValues()) { + throw new UnsupportedOperationException( + "makeObjectColumnSelector does not support multivalued GenericColumns" + ); + } + if (type == ValueType.FLOAT) { return new ObjectColumnSelector() { @@ -466,20 +466,42 @@ public class QueryableIndexStorageAdapter implements StorageAdapter if (cachedColumnVals instanceof DictionaryEncodedColumn) { final DictionaryEncodedColumn columnVals = (DictionaryEncodedColumn) cachedColumnVals; - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() + if (columnVals.hasMultipleValues()) { + return new ObjectColumnSelector() { - return String.class; - } + @Override + public Class classOfObject() + { + return String[].class; + } - @Override - public String get() + @Override + public String[] get() + { + final IndexedInts multiValueRow = columnVals.getMultiValueRow(cursorOffset.getOffset()); + final String[] strings = new String[multiValueRow.size()]; + for (int i = 0 ; i < multiValueRow.size() ; i++) { + strings[i] = columnVals.lookupName(multiValueRow.get(i)); + } + return strings; + } + }; + } else { + return new ObjectColumnSelector() { - return columnVals.lookupName(columnVals.getSingleValueRow(cursorOffset.getOffset())); - } - }; + @Override + public Class classOfObject() + { + return String.class; + } + + @Override + public String get() + { + return columnVals.lookupName(columnVals.getSingleValueRow(cursorOffset.getOffset())); + } + }; + } } final ComplexColumn columnVals = (ComplexColumn) cachedColumnVals; @@ -786,11 +808,6 @@ public class QueryableIndexStorageAdapter implements StorageAdapter Column holder = index.getColumn(columnName); if (holder != null) { - if (holder.getCapabilities().hasMultipleValues()) { - throw new UnsupportedOperationException( - "makeObjectColumnSelector does not support multivalued columns" - ); - } final ValueType type = holder.getCapabilities().getType(); if (holder.getCapabilities().isDictionaryEncoded()) { @@ -815,6 +832,12 @@ public class QueryableIndexStorageAdapter implements StorageAdapter final GenericColumn columnVals = (GenericColumn) cachedColumnVals; final ValueType type = columnVals.getType(); + if (columnVals.hasMultipleValues()) { + throw new UnsupportedOperationException( + "makeObjectColumnSelector does not support multivalued GenericColumns" + ); + } + if (type == ValueType.FLOAT) { return new ObjectColumnSelector() { @@ -867,20 +890,42 @@ public class QueryableIndexStorageAdapter implements StorageAdapter if (cachedColumnVals instanceof DictionaryEncodedColumn) { final DictionaryEncodedColumn columnVals = (DictionaryEncodedColumn) cachedColumnVals; - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() + if (columnVals.hasMultipleValues()) { + return new ObjectColumnSelector() { - return String.class; - } + @Override + public Class classOfObject() + { + return String[].class; + } - @Override - public String get() + @Override + public String[] get() + { + final IndexedInts multiValueRow = columnVals.getMultiValueRow(currRow); + final String[] strings = new String[multiValueRow.size()]; + for (int i = 0 ; i < multiValueRow.size() ; i++) { + strings[i] = columnVals.lookupName(multiValueRow.get(i)); + } + return strings; + } + }; + } else { + return new ObjectColumnSelector() { - return columnVals.lookupName(columnVals.getSingleValueRow(currRow)); - } - }; + @Override + public Class classOfObject() + { + return String.class; + } + + @Override + public String get() + { + return columnVals.lookupName(columnVals.getSingleValueRow(currRow)); + } + }; + } } final ComplexColumn columnVals = (ComplexColumn) cachedColumnVals; diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index e765ab7867d..d4c3dad7a7a 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -403,27 +403,31 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter if (dimensionIndexInt != null) { final int dimensionIndex = dimensionIndexInt; - return new ObjectColumnSelector() + return new ObjectColumnSelector() { @Override public Class classOfObject() { - return String.class; + if (currEntry.getKey().getDims()[dimensionIndex].length > 1) { + return String[].class; + } else { + return String.class; + } } @Override - public String get() + public Object get() { final String[] dimVals = currEntry.getKey().getDims()[dimensionIndex]; if (dimVals.length == 1) { return dimVals[0]; } - if (dimVals.length == 0) { + else if (dimVals.length == 0) { return null; } - throw new UnsupportedOperationException( - "makeObjectColumnSelector does not support multivalued columns" - ); + else { + return dimVals; + } } }; } diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index d72c97594f4..9421cba0aba 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -25,6 +25,7 @@ import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; +import io.druid.query.aggregation.JavaScriptAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; @@ -80,6 +81,13 @@ public class QueryRunnerTestHelper public static final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows"); public static final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index"); public static final DoubleSumAggregatorFactory indexDoubleSum = new DoubleSumAggregatorFactory("index", "index"); + public static final JavaScriptAggregatorFactory indexSumJsPlacementishN = new JavaScriptAggregatorFactory( + "nindex", + Arrays.asList("placementish", "index"), + "function aggregate(current, a, b) { if ((typeof a !== 'string' && a.indexOf('a') > -1) || a === 'a') { return current + b; } else { return current; } }", + "function reset() { return 0; }", + "function combine(a, b) { return a + b; }" + ); public static final HyperUniquesAggregatorFactory qualityUniques = new HyperUniquesAggregatorFactory( "uniques", "quality_uniques" diff --git a/processing/src/test/java/io/druid/query/aggregation/JavaScriptAggregatorTest.java b/processing/src/test/java/io/druid/query/aggregation/JavaScriptAggregatorTest.java index 7f087559339..36250b18e41 100644 --- a/processing/src/test/java/io/druid/query/aggregation/JavaScriptAggregatorTest.java +++ b/processing/src/test/java/io/druid/query/aggregation/JavaScriptAggregatorTest.java @@ -68,6 +68,12 @@ public class JavaScriptAggregatorTest selector.increment(); } + private static void aggregate(TestObjectColumnSelector selector, Aggregator agg) + { + agg.aggregate(); + selector.increment(); + } + @Test public void testAggregate() { @@ -175,6 +181,42 @@ public class JavaScriptAggregatorTest Assert.assertEquals(val, agg.get()); } + @Test + public void testAggregateStrings() + { + final TestObjectColumnSelector ocs = new TestObjectColumnSelector("what", new String[]{"hey", "there"}); + final JavaScriptAggregator agg = new JavaScriptAggregator( + "billy", + Collections.singletonList(ocs), + JavaScriptAggregatorFactory.compileScript( + "function aggregate(current, a) { if (typeof a === 'string') { return current + 1; } else { return current + a.length; } }", + scriptDoubleSum.get("fnReset"), + scriptDoubleSum.get("fnCombine") + ) + ); + + agg.reset(); + + Assert.assertEquals("billy", agg.getName()); + + double val = 0.; + Assert.assertEquals(val, agg.get()); + Assert.assertEquals(val, agg.get()); + Assert.assertEquals(val, agg.get()); + aggregate(ocs, agg); + + val += 1; + Assert.assertEquals(val, agg.get()); + Assert.assertEquals(val, agg.get()); + Assert.assertEquals(val, agg.get()); + aggregate(ocs, agg); + + val += 2; + Assert.assertEquals(val, agg.get()); + Assert.assertEquals(val, agg.get()); + Assert.assertEquals(val, agg.get()); + } + public static void main(String... args) throws Exception { final LoopingFloatColumnSelector selector = new LoopingFloatColumnSelector(new float[]{42.12f, 9f}); diff --git a/processing/src/test/java/io/druid/query/aggregation/TestObjectColumnSelector.java b/processing/src/test/java/io/druid/query/aggregation/TestObjectColumnSelector.java new file mode 100644 index 00000000000..3beb15f5584 --- /dev/null +++ b/processing/src/test/java/io/druid/query/aggregation/TestObjectColumnSelector.java @@ -0,0 +1,53 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation; + +import io.druid.segment.ObjectColumnSelector; + +/** + */ +public class TestObjectColumnSelector implements ObjectColumnSelector +{ + private final Object[] objects; + + private int index = 0; + + public TestObjectColumnSelector(Object... objects) + { + this.objects = objects; + } + + @Override + public Class classOfObject() + { + return objects[index].getClass(); + } + + @Override + public Object get() + { + return objects[index]; + } + + public void increment() + { + ++index; + } +} diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java index a8d626ae044..bf15a8a75f5 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java @@ -19,6 +19,7 @@ package io.druid.query.timeseries; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.metamx.common.guava.Sequences; @@ -1262,6 +1263,43 @@ public class TimeseriesQueryRunnerTest TestHelper.assertExpectedResults(expectedResults, results); } + @Test + public void testTimeseriesWithMultiValueFilteringJavascriptAggregator() + { + TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource(QueryRunnerTestHelper.dataSource) + .granularity(QueryRunnerTestHelper.allGran) + .intervals(QueryRunnerTestHelper.firstToThird) + .aggregators( + ImmutableList.of( + QueryRunnerTestHelper.indexDoubleSum, + QueryRunnerTestHelper.indexSumJsPlacementishN + ) + ) + .build(); + + Iterable> expectedResults = ImmutableList.of( + new Result<>( + new DateTime( + QueryRunnerTestHelper.firstToThird.getIntervals() + .get(0) + .getStart() + ), + new TimeseriesResultValue( + ImmutableMap.of( + "index", 12459.361190795898d, + "nindex", 283.31103515625d + ) + ) + ) + ); + Iterable> actualResults = Sequences.toList( + runner.run(query), + Lists.>newArrayList() + ); + TestHelper.assertExpectedResults(expectedResults, actualResults); + } + @Test public void testTimeseriesWithMultiValueDimFilter1() {