mirror of https://github.com/apache/druid.git
ObjectColumnSelector/JS aggregator support for multi-valued columns.
This commit is contained in:
parent
da285d8665
commit
c2137e8416
|
@ -32,12 +32,15 @@ import org.mozilla.javascript.Context;
|
||||||
import org.mozilla.javascript.ContextAction;
|
import org.mozilla.javascript.ContextAction;
|
||||||
import org.mozilla.javascript.ContextFactory;
|
import org.mozilla.javascript.ContextFactory;
|
||||||
import org.mozilla.javascript.Function;
|
import org.mozilla.javascript.Function;
|
||||||
|
import org.mozilla.javascript.NativeArray;
|
||||||
|
import org.mozilla.javascript.Scriptable;
|
||||||
import org.mozilla.javascript.ScriptableObject;
|
import org.mozilla.javascript.ScriptableObject;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.security.MessageDigest;
|
import java.security.MessageDigest;
|
||||||
import java.security.NoSuchAlgorithmException;
|
import java.security.NoSuchAlgorithmException;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
@ -263,6 +266,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
|
||||||
Context cx = Context.getCurrentContext();
|
Context cx = Context.getCurrentContext();
|
||||||
if (cx == null) {
|
if (cx == null) {
|
||||||
cx = contextFactory.enterContext();
|
cx = contextFactory.enterContext();
|
||||||
|
cx.getWrapFactory().setJavaPrimitiveWrap(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
final int size = selectorList.length;
|
final int size = selectorList.length;
|
||||||
|
@ -272,7 +276,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
|
||||||
for (int i = 0 ; i < size ; i++) {
|
for (int i = 0 ; i < size ; i++) {
|
||||||
final ObjectColumnSelector selector = selectorList[i];
|
final ObjectColumnSelector selector = selectorList[i];
|
||||||
if (selector != null) {
|
if (selector != null) {
|
||||||
args[i + 1] = selector.get();
|
args[i + 1] = Context.javaToJS(selector.get(), scope);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -386,12 +386,6 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
||||||
if (holder != null) {
|
if (holder != null) {
|
||||||
final ColumnCapabilities capabilities = holder.getCapabilities();
|
final ColumnCapabilities capabilities = holder.getCapabilities();
|
||||||
|
|
||||||
if (capabilities.hasMultipleValues()) {
|
|
||||||
throw new UnsupportedOperationException(
|
|
||||||
"makeObjectColumnSelector does not support multivalued columns"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (capabilities.isDictionaryEncoded()) {
|
if (capabilities.isDictionaryEncoded()) {
|
||||||
cachedColumnVals = holder.getDictionaryEncoding();
|
cachedColumnVals = holder.getDictionaryEncoding();
|
||||||
} else if (capabilities.getType() == ValueType.COMPLEX) {
|
} else if (capabilities.getType() == ValueType.COMPLEX) {
|
||||||
|
@ -414,6 +408,12 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
||||||
final GenericColumn columnVals = (GenericColumn) cachedColumnVals;
|
final GenericColumn columnVals = (GenericColumn) cachedColumnVals;
|
||||||
final ValueType type = columnVals.getType();
|
final ValueType type = columnVals.getType();
|
||||||
|
|
||||||
|
if (columnVals.hasMultipleValues()) {
|
||||||
|
throw new UnsupportedOperationException(
|
||||||
|
"makeObjectColumnSelector does not support multivalued GenericColumns"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
if (type == ValueType.FLOAT) {
|
if (type == ValueType.FLOAT) {
|
||||||
return new ObjectColumnSelector<Float>()
|
return new ObjectColumnSelector<Float>()
|
||||||
{
|
{
|
||||||
|
@ -466,20 +466,42 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
||||||
|
|
||||||
if (cachedColumnVals instanceof DictionaryEncodedColumn) {
|
if (cachedColumnVals instanceof DictionaryEncodedColumn) {
|
||||||
final DictionaryEncodedColumn columnVals = (DictionaryEncodedColumn) cachedColumnVals;
|
final DictionaryEncodedColumn columnVals = (DictionaryEncodedColumn) cachedColumnVals;
|
||||||
return new ObjectColumnSelector<String>()
|
if (columnVals.hasMultipleValues()) {
|
||||||
{
|
return new ObjectColumnSelector<String[]>()
|
||||||
@Override
|
|
||||||
public Class classOfObject()
|
|
||||||
{
|
{
|
||||||
return String.class;
|
@Override
|
||||||
}
|
public Class classOfObject()
|
||||||
|
{
|
||||||
|
return String[].class;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String get()
|
public String[] get()
|
||||||
|
{
|
||||||
|
final IndexedInts multiValueRow = columnVals.getMultiValueRow(cursorOffset.getOffset());
|
||||||
|
final String[] strings = new String[multiValueRow.size()];
|
||||||
|
for (int i = 0 ; i < multiValueRow.size() ; i++) {
|
||||||
|
strings[i] = columnVals.lookupName(multiValueRow.get(i));
|
||||||
|
}
|
||||||
|
return strings;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
return new ObjectColumnSelector<String>()
|
||||||
{
|
{
|
||||||
return columnVals.lookupName(columnVals.getSingleValueRow(cursorOffset.getOffset()));
|
@Override
|
||||||
}
|
public Class classOfObject()
|
||||||
};
|
{
|
||||||
|
return String.class;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String get()
|
||||||
|
{
|
||||||
|
return columnVals.lookupName(columnVals.getSingleValueRow(cursorOffset.getOffset()));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
final ComplexColumn columnVals = (ComplexColumn) cachedColumnVals;
|
final ComplexColumn columnVals = (ComplexColumn) cachedColumnVals;
|
||||||
|
@ -786,11 +808,6 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
||||||
Column holder = index.getColumn(columnName);
|
Column holder = index.getColumn(columnName);
|
||||||
|
|
||||||
if (holder != null) {
|
if (holder != null) {
|
||||||
if (holder.getCapabilities().hasMultipleValues()) {
|
|
||||||
throw new UnsupportedOperationException(
|
|
||||||
"makeObjectColumnSelector does not support multivalued columns"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
final ValueType type = holder.getCapabilities().getType();
|
final ValueType type = holder.getCapabilities().getType();
|
||||||
|
|
||||||
if (holder.getCapabilities().isDictionaryEncoded()) {
|
if (holder.getCapabilities().isDictionaryEncoded()) {
|
||||||
|
@ -815,6 +832,12 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
||||||
final GenericColumn columnVals = (GenericColumn) cachedColumnVals;
|
final GenericColumn columnVals = (GenericColumn) cachedColumnVals;
|
||||||
final ValueType type = columnVals.getType();
|
final ValueType type = columnVals.getType();
|
||||||
|
|
||||||
|
if (columnVals.hasMultipleValues()) {
|
||||||
|
throw new UnsupportedOperationException(
|
||||||
|
"makeObjectColumnSelector does not support multivalued GenericColumns"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
if (type == ValueType.FLOAT) {
|
if (type == ValueType.FLOAT) {
|
||||||
return new ObjectColumnSelector<Float>()
|
return new ObjectColumnSelector<Float>()
|
||||||
{
|
{
|
||||||
|
@ -867,20 +890,42 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
||||||
|
|
||||||
if (cachedColumnVals instanceof DictionaryEncodedColumn) {
|
if (cachedColumnVals instanceof DictionaryEncodedColumn) {
|
||||||
final DictionaryEncodedColumn columnVals = (DictionaryEncodedColumn) cachedColumnVals;
|
final DictionaryEncodedColumn columnVals = (DictionaryEncodedColumn) cachedColumnVals;
|
||||||
return new ObjectColumnSelector<String>()
|
if (columnVals.hasMultipleValues()) {
|
||||||
{
|
return new ObjectColumnSelector<String[]>()
|
||||||
@Override
|
|
||||||
public Class classOfObject()
|
|
||||||
{
|
{
|
||||||
return String.class;
|
@Override
|
||||||
}
|
public Class classOfObject()
|
||||||
|
{
|
||||||
|
return String[].class;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String get()
|
public String[] get()
|
||||||
|
{
|
||||||
|
final IndexedInts multiValueRow = columnVals.getMultiValueRow(currRow);
|
||||||
|
final String[] strings = new String[multiValueRow.size()];
|
||||||
|
for (int i = 0 ; i < multiValueRow.size() ; i++) {
|
||||||
|
strings[i] = columnVals.lookupName(multiValueRow.get(i));
|
||||||
|
}
|
||||||
|
return strings;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
return new ObjectColumnSelector<String>()
|
||||||
{
|
{
|
||||||
return columnVals.lookupName(columnVals.getSingleValueRow(currRow));
|
@Override
|
||||||
}
|
public Class classOfObject()
|
||||||
};
|
{
|
||||||
|
return String.class;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String get()
|
||||||
|
{
|
||||||
|
return columnVals.lookupName(columnVals.getSingleValueRow(currRow));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
final ComplexColumn columnVals = (ComplexColumn) cachedColumnVals;
|
final ComplexColumn columnVals = (ComplexColumn) cachedColumnVals;
|
||||||
|
|
|
@ -403,27 +403,31 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
|
||||||
|
|
||||||
if (dimensionIndexInt != null) {
|
if (dimensionIndexInt != null) {
|
||||||
final int dimensionIndex = dimensionIndexInt;
|
final int dimensionIndex = dimensionIndexInt;
|
||||||
return new ObjectColumnSelector<String>()
|
return new ObjectColumnSelector<Object>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public Class classOfObject()
|
public Class classOfObject()
|
||||||
{
|
{
|
||||||
return String.class;
|
if (currEntry.getKey().getDims()[dimensionIndex].length > 1) {
|
||||||
|
return String[].class;
|
||||||
|
} else {
|
||||||
|
return String.class;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String get()
|
public Object get()
|
||||||
{
|
{
|
||||||
final String[] dimVals = currEntry.getKey().getDims()[dimensionIndex];
|
final String[] dimVals = currEntry.getKey().getDims()[dimensionIndex];
|
||||||
if (dimVals.length == 1) {
|
if (dimVals.length == 1) {
|
||||||
return dimVals[0];
|
return dimVals[0];
|
||||||
}
|
}
|
||||||
if (dimVals.length == 0) {
|
else if (dimVals.length == 0) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
throw new UnsupportedOperationException(
|
else {
|
||||||
"makeObjectColumnSelector does not support multivalued columns"
|
return dimVals;
|
||||||
);
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,7 @@ import io.druid.granularity.QueryGranularity;
|
||||||
import io.druid.query.aggregation.AggregatorFactory;
|
import io.druid.query.aggregation.AggregatorFactory;
|
||||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||||
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
|
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
|
||||||
|
import io.druid.query.aggregation.JavaScriptAggregatorFactory;
|
||||||
import io.druid.query.aggregation.LongSumAggregatorFactory;
|
import io.druid.query.aggregation.LongSumAggregatorFactory;
|
||||||
import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
|
import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
|
||||||
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
|
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
|
||||||
|
@ -80,6 +81,13 @@ public class QueryRunnerTestHelper
|
||||||
public static final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows");
|
public static final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows");
|
||||||
public static final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index");
|
public static final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index");
|
||||||
public static final DoubleSumAggregatorFactory indexDoubleSum = new DoubleSumAggregatorFactory("index", "index");
|
public static final DoubleSumAggregatorFactory indexDoubleSum = new DoubleSumAggregatorFactory("index", "index");
|
||||||
|
public static final JavaScriptAggregatorFactory indexSumJsPlacementishN = new JavaScriptAggregatorFactory(
|
||||||
|
"nindex",
|
||||||
|
Arrays.asList("placementish", "index"),
|
||||||
|
"function aggregate(current, a, b) { if ((typeof a !== 'string' && a.indexOf('a') > -1) || a === 'a') { return current + b; } else { return current; } }",
|
||||||
|
"function reset() { return 0; }",
|
||||||
|
"function combine(a, b) { return a + b; }"
|
||||||
|
);
|
||||||
public static final HyperUniquesAggregatorFactory qualityUniques = new HyperUniquesAggregatorFactory(
|
public static final HyperUniquesAggregatorFactory qualityUniques = new HyperUniquesAggregatorFactory(
|
||||||
"uniques",
|
"uniques",
|
||||||
"quality_uniques"
|
"quality_uniques"
|
||||||
|
|
|
@ -68,6 +68,12 @@ public class JavaScriptAggregatorTest
|
||||||
selector.increment();
|
selector.increment();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void aggregate(TestObjectColumnSelector selector, Aggregator agg)
|
||||||
|
{
|
||||||
|
agg.aggregate();
|
||||||
|
selector.increment();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAggregate()
|
public void testAggregate()
|
||||||
{
|
{
|
||||||
|
@ -175,6 +181,42 @@ public class JavaScriptAggregatorTest
|
||||||
Assert.assertEquals(val, agg.get());
|
Assert.assertEquals(val, agg.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAggregateStrings()
|
||||||
|
{
|
||||||
|
final TestObjectColumnSelector ocs = new TestObjectColumnSelector("what", new String[]{"hey", "there"});
|
||||||
|
final JavaScriptAggregator agg = new JavaScriptAggregator(
|
||||||
|
"billy",
|
||||||
|
Collections.<ObjectColumnSelector>singletonList(ocs),
|
||||||
|
JavaScriptAggregatorFactory.compileScript(
|
||||||
|
"function aggregate(current, a) { if (typeof a === 'string') { return current + 1; } else { return current + a.length; } }",
|
||||||
|
scriptDoubleSum.get("fnReset"),
|
||||||
|
scriptDoubleSum.get("fnCombine")
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
agg.reset();
|
||||||
|
|
||||||
|
Assert.assertEquals("billy", agg.getName());
|
||||||
|
|
||||||
|
double val = 0.;
|
||||||
|
Assert.assertEquals(val, agg.get());
|
||||||
|
Assert.assertEquals(val, agg.get());
|
||||||
|
Assert.assertEquals(val, agg.get());
|
||||||
|
aggregate(ocs, agg);
|
||||||
|
|
||||||
|
val += 1;
|
||||||
|
Assert.assertEquals(val, agg.get());
|
||||||
|
Assert.assertEquals(val, agg.get());
|
||||||
|
Assert.assertEquals(val, agg.get());
|
||||||
|
aggregate(ocs, agg);
|
||||||
|
|
||||||
|
val += 2;
|
||||||
|
Assert.assertEquals(val, agg.get());
|
||||||
|
Assert.assertEquals(val, agg.get());
|
||||||
|
Assert.assertEquals(val, agg.get());
|
||||||
|
}
|
||||||
|
|
||||||
public static void main(String... args) throws Exception {
|
public static void main(String... args) throws Exception {
|
||||||
final LoopingFloatColumnSelector selector = new LoopingFloatColumnSelector(new float[]{42.12f, 9f});
|
final LoopingFloatColumnSelector selector = new LoopingFloatColumnSelector(new float[]{42.12f, 9f});
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,53 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.query.aggregation;
|
||||||
|
|
||||||
|
import io.druid.segment.ObjectColumnSelector;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class TestObjectColumnSelector implements ObjectColumnSelector
|
||||||
|
{
|
||||||
|
private final Object[] objects;
|
||||||
|
|
||||||
|
private int index = 0;
|
||||||
|
|
||||||
|
public TestObjectColumnSelector(Object... objects)
|
||||||
|
{
|
||||||
|
this.objects = objects;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Class classOfObject()
|
||||||
|
{
|
||||||
|
return objects[index].getClass();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object get()
|
||||||
|
{
|
||||||
|
return objects[index];
|
||||||
|
}
|
||||||
|
|
||||||
|
public void increment()
|
||||||
|
{
|
||||||
|
++index;
|
||||||
|
}
|
||||||
|
}
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package io.druid.query.timeseries;
|
package io.druid.query.timeseries;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.metamx.common.guava.Sequences;
|
import com.metamx.common.guava.Sequences;
|
||||||
|
@ -1262,6 +1263,43 @@ public class TimeseriesQueryRunnerTest
|
||||||
TestHelper.assertExpectedResults(expectedResults, results);
|
TestHelper.assertExpectedResults(expectedResults, results);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTimeseriesWithMultiValueFilteringJavascriptAggregator()
|
||||||
|
{
|
||||||
|
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
|
||||||
|
.dataSource(QueryRunnerTestHelper.dataSource)
|
||||||
|
.granularity(QueryRunnerTestHelper.allGran)
|
||||||
|
.intervals(QueryRunnerTestHelper.firstToThird)
|
||||||
|
.aggregators(
|
||||||
|
ImmutableList.of(
|
||||||
|
QueryRunnerTestHelper.indexDoubleSum,
|
||||||
|
QueryRunnerTestHelper.indexSumJsPlacementishN
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
Iterable<Result<TimeseriesResultValue>> expectedResults = ImmutableList.of(
|
||||||
|
new Result<>(
|
||||||
|
new DateTime(
|
||||||
|
QueryRunnerTestHelper.firstToThird.getIntervals()
|
||||||
|
.get(0)
|
||||||
|
.getStart()
|
||||||
|
),
|
||||||
|
new TimeseriesResultValue(
|
||||||
|
ImmutableMap.<String, Object>of(
|
||||||
|
"index", 12459.361190795898d,
|
||||||
|
"nindex", 283.31103515625d
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
Iterable<Result<TimeseriesResultValue>> actualResults = Sequences.toList(
|
||||||
|
runner.run(query),
|
||||||
|
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||||
|
);
|
||||||
|
TestHelper.assertExpectedResults(expectedResults, actualResults);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testTimeseriesWithMultiValueDimFilter1()
|
public void testTimeseriesWithMultiValueDimFilter1()
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue