Merge pull request #546 from metamx/object-column-selector-multivalued

ObjectColumnSelector/JS aggregator support for multi-valued columns.
This commit is contained in:
xvrl 2014-05-16 11:03:30 -07:00
commit 0c6e238e5b
8 changed files with 428 additions and 68 deletions

View File

@ -32,12 +32,16 @@ import org.mozilla.javascript.Context;
import org.mozilla.javascript.ContextAction;
import org.mozilla.javascript.ContextFactory;
import org.mozilla.javascript.Function;
import org.mozilla.javascript.NativeArray;
import org.mozilla.javascript.Scriptable;
import org.mozilla.javascript.ScriptableObject;
import javax.annotation.Nullable;
import java.lang.reflect.Array;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
@ -263,6 +267,9 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
Context cx = Context.getCurrentContext();
if (cx == null) {
cx = contextFactory.enterContext();
// Disable primitive wrapping- we want Java strings and primitives to behave like JS entities.
cx.getWrapFactory().setJavaPrimitiveWrap(false);
}
final int size = selectorList.length;
@ -272,7 +279,18 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
for (int i = 0 ; i < size ; i++) {
final ObjectColumnSelector selector = selectorList[i];
if (selector != null) {
args[i + 1] = selector.get();
final Object arg = selector.get();
if (arg != null && arg.getClass().isArray()) {
// Context.javaToJS on an array sort of works, although it returns false for Array.isArray(...) and
// may have other issues too. Let's just copy the array and wrap that.
final Object[] arrayAsObjectArray = new Object[Array.getLength(arg)];
for (int j = 0; j < Array.getLength(arg); j++) {
arrayAsObjectArray[j] = Array.get(arg, j);
}
args[i + 1] = cx.newArray(scope, arrayAsObjectArray);
} else {
args[i + 1] = Context.javaToJS(arg, scope);
}
}
}

View File

@ -386,12 +386,6 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
if (holder != null) {
final ColumnCapabilities capabilities = holder.getCapabilities();
if (capabilities.hasMultipleValues()) {
throw new UnsupportedOperationException(
"makeObjectColumnSelector does not support multivalued columns"
);
}
if (capabilities.isDictionaryEncoded()) {
cachedColumnVals = holder.getDictionaryEncoding();
} else if (capabilities.getType() == ValueType.COMPLEX) {
@ -414,6 +408,12 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
final GenericColumn columnVals = (GenericColumn) cachedColumnVals;
final ValueType type = columnVals.getType();
if (columnVals.hasMultipleValues()) {
throw new UnsupportedOperationException(
"makeObjectColumnSelector does not support multivalued GenericColumns"
);
}
if (type == ValueType.FLOAT) {
return new ObjectColumnSelector<Float>()
{
@ -466,20 +466,48 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
if (cachedColumnVals instanceof DictionaryEncodedColumn) {
final DictionaryEncodedColumn columnVals = (DictionaryEncodedColumn) cachedColumnVals;
return new ObjectColumnSelector<String>()
{
@Override
public Class classOfObject()
if (columnVals.hasMultipleValues()) {
return new ObjectColumnSelector<Object>()
{
return String.class;
}
@Override
public Class classOfObject()
{
return Object.class;
}
@Override
public String get()
@Override
public Object get()
{
final IndexedInts multiValueRow = columnVals.getMultiValueRow(cursorOffset.getOffset());
if (multiValueRow.size() == 0) {
return null;
} else if (multiValueRow.size() == 1) {
return columnVals.lookupName(multiValueRow.get(1));
} else {
final String[] strings = new String[multiValueRow.size()];
for (int i = 0 ; i < multiValueRow.size() ; i++) {
strings[i] = columnVals.lookupName(multiValueRow.get(i));
}
return strings;
}
}
};
} else {
return new ObjectColumnSelector<String>()
{
return columnVals.lookupName(columnVals.getSingleValueRow(cursorOffset.getOffset()));
}
};
@Override
public Class classOfObject()
{
return String.class;
}
@Override
public String get()
{
return columnVals.lookupName(columnVals.getSingleValueRow(cursorOffset.getOffset()));
}
};
}
}
final ComplexColumn columnVals = (ComplexColumn) cachedColumnVals;
@ -786,11 +814,6 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
Column holder = index.getColumn(columnName);
if (holder != null) {
if (holder.getCapabilities().hasMultipleValues()) {
throw new UnsupportedOperationException(
"makeObjectColumnSelector does not support multivalued columns"
);
}
final ValueType type = holder.getCapabilities().getType();
if (holder.getCapabilities().isDictionaryEncoded()) {
@ -815,6 +838,12 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
final GenericColumn columnVals = (GenericColumn) cachedColumnVals;
final ValueType type = columnVals.getType();
if (columnVals.hasMultipleValues()) {
throw new UnsupportedOperationException(
"makeObjectColumnSelector does not support multivalued GenericColumns"
);
}
if (type == ValueType.FLOAT) {
return new ObjectColumnSelector<Float>()
{
@ -867,20 +896,48 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
if (cachedColumnVals instanceof DictionaryEncodedColumn) {
final DictionaryEncodedColumn columnVals = (DictionaryEncodedColumn) cachedColumnVals;
return new ObjectColumnSelector<String>()
{
@Override
public Class classOfObject()
if (columnVals.hasMultipleValues()) {
return new ObjectColumnSelector<Object>()
{
return String.class;
}
@Override
public Class classOfObject()
{
return Object.class;
}
@Override
public String get()
@Override
public Object get()
{
final IndexedInts multiValueRow = columnVals.getMultiValueRow(currRow);
if (multiValueRow.size() == 0) {
return null;
} else if (multiValueRow.size() == 1) {
return columnVals.lookupName(multiValueRow.get(1));
} else {
final String[] strings = new String[multiValueRow.size()];
for (int i = 0 ; i < multiValueRow.size() ; i++) {
strings[i] = columnVals.lookupName(multiValueRow.get(i));
}
return strings;
}
}
};
} else {
return new ObjectColumnSelector<String>()
{
return columnVals.lookupName(columnVals.getSingleValueRow(currRow));
}
};
@Override
public Class classOfObject()
{
return String.class;
}
@Override
public String get()
{
return columnVals.lookupName(columnVals.getSingleValueRow(currRow));
}
};
}
}
final ComplexColumn columnVals = (ComplexColumn) cachedColumnVals;

View File

@ -403,27 +403,27 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
if (dimensionIndexInt != null) {
final int dimensionIndex = dimensionIndexInt;
return new ObjectColumnSelector<String>()
return new ObjectColumnSelector<Object>()
{
@Override
public Class classOfObject()
{
return String.class;
return Object.class;
}
@Override
public String get()
public Object get()
{
final String[] dimVals = currEntry.getKey().getDims()[dimensionIndex];
if (dimVals.length == 1) {
return dimVals[0];
}
if (dimVals.length == 0) {
else if (dimVals.length == 0) {
return null;
}
throw new UnsupportedOperationException(
"makeObjectColumnSelector does not support multivalued columns"
);
else {
return dimVals;
}
}
};
}

View File

@ -25,6 +25,7 @@ import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.JavaScriptAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
@ -80,6 +81,20 @@ public class QueryRunnerTestHelper
public static final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows");
public static final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index");
public static final DoubleSumAggregatorFactory indexDoubleSum = new DoubleSumAggregatorFactory("index", "index");
public static final JavaScriptAggregatorFactory jsIndexSumIfPlacementishA = new JavaScriptAggregatorFactory(
"nindex",
Arrays.asList("placementish", "index"),
"function aggregate(current, a, b) { if ((Array.isArray(a) && a.indexOf('a') > -1) || a === 'a') { return current + b; } else { return current; } }",
"function reset() { return 0; }",
"function combine(a, b) { return a + b; }"
);
public static final JavaScriptAggregatorFactory jsPlacementishCount = new JavaScriptAggregatorFactory(
"pishcount",
Arrays.asList("placementish", "index"),
"function aggregate(current, a) { if (Array.isArray(a)) { return current + a.length; } else if (typeof a === 'string') { return current + 1; } else { return current; } }",
"function reset() { return 0; }",
"function combine(a, b) { return a + b; }"
);
public static final HyperUniquesAggregatorFactory qualityUniques = new HyperUniquesAggregatorFactory(
"uniques",
"quality_uniques"

View File

@ -0,0 +1,116 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation;
import com.google.caliper.Runner;
import com.google.caliper.SimpleBenchmark;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.druid.segment.ObjectColumnSelector;
import java.util.Map;
public class JavaScriptAggregatorBenchmark extends SimpleBenchmark
{
protected static final Map<String, String> scriptDoubleSum = Maps.newHashMap();
static {
scriptDoubleSum.put("fnAggregate", "function aggregate(current, a) { return current + a }");
scriptDoubleSum.put("fnReset", "function reset() { return 0 }");
scriptDoubleSum.put("fnCombine", "function combine(a,b) { return a + b }");
}
private static void aggregate(TestFloatColumnSelector selector, Aggregator agg)
{
agg.aggregate();
selector.increment();
}
private JavaScriptAggregator jsAggregator;
private DoubleSumAggregator doubleAgg;
final LoopingFloatColumnSelector selector = new LoopingFloatColumnSelector(new float[]{42.12f, 9f});
@Override
protected void setUp() throws Exception
{
Map<String, String> script = scriptDoubleSum;
jsAggregator = new JavaScriptAggregator(
"billy",
Lists.asList(MetricSelectorUtils.wrap(selector), new ObjectColumnSelector[]{}),
JavaScriptAggregatorFactory.compileScript(
script.get("fnAggregate"),
script.get("fnReset"),
script.get("fnCombine")
)
);
doubleAgg = new DoubleSumAggregator("billy", selector);
}
public double timeJavaScriptDoubleSum(int reps)
{
double val = 0;
for(int i = 0; i < reps; ++i) {
aggregate(selector, jsAggregator);
}
return val;
}
public double timeNativeDoubleSum(int reps)
{
double val = 0;
for(int i = 0; i < reps; ++i) {
aggregate(selector, doubleAgg);
}
return val;
}
public static void main(String[] args) throws Exception
{
Runner.main(JavaScriptAggregatorBenchmark.class, args);
}
protected static class LoopingFloatColumnSelector extends TestFloatColumnSelector
{
private final float[] floats;
private long index = 0;
public LoopingFloatColumnSelector(float[] floats)
{
super(floats);
this.floats = floats;
}
@Override
public float get()
{
return floats[(int) (index % floats.length)];
}
public void increment()
{
++index;
if (index < 0) {
index = 0;
}
}
}
}

View File

@ -68,6 +68,12 @@ public class JavaScriptAggregatorTest
selector.increment();
}
private static void aggregate(TestObjectColumnSelector selector, Aggregator agg)
{
agg.aggregate();
selector.increment();
}
@Test
public void testAggregate()
{
@ -175,8 +181,49 @@ public class JavaScriptAggregatorTest
Assert.assertEquals(val, agg.get());
}
@Test
public void testAggregateStrings()
{
final TestObjectColumnSelector ocs = new TestObjectColumnSelector("what", null, new String[]{"hey", "there"});
final JavaScriptAggregator agg = new JavaScriptAggregator(
"billy",
Collections.<ObjectColumnSelector>singletonList(ocs),
JavaScriptAggregatorFactory.compileScript(
"function aggregate(current, a) { if (Array.isArray(a)) { return current + a.length; } else if (typeof a === 'string') { return current + 1; } else { return current; } }",
scriptDoubleSum.get("fnReset"),
scriptDoubleSum.get("fnCombine")
)
);
agg.reset();
Assert.assertEquals("billy", agg.getName());
double val = 0.;
Assert.assertEquals(val, agg.get());
Assert.assertEquals(val, agg.get());
Assert.assertEquals(val, agg.get());
aggregate(ocs, agg);
val += 1;
Assert.assertEquals(val, agg.get());
Assert.assertEquals(val, agg.get());
Assert.assertEquals(val, agg.get());
aggregate(ocs, agg);
Assert.assertEquals(val, agg.get());
Assert.assertEquals(val, agg.get());
Assert.assertEquals(val, agg.get());
aggregate(ocs, agg);
val += 2;
Assert.assertEquals(val, agg.get());
Assert.assertEquals(val, agg.get());
Assert.assertEquals(val, agg.get());
}
public static void main(String... args) throws Exception {
final LoopingFloatColumnSelector selector = new LoopingFloatColumnSelector(new float[]{42.12f, 9f});
final JavaScriptAggregatorBenchmark.LoopingFloatColumnSelector selector = new JavaScriptAggregatorBenchmark.LoopingFloatColumnSelector(new float[]{42.12f, 9f});
/* memory usage test
List<JavaScriptAggregator> aggs = Lists.newLinkedList();
@ -239,30 +286,4 @@ public class JavaScriptAggregatorTest
System.out.println(String.format("JavaScript is %2.1fx slower", (double)t1 / t2));
}
static class LoopingFloatColumnSelector extends TestFloatColumnSelector
{
private final float[] floats;
private long index = 0;
public LoopingFloatColumnSelector(float[] floats)
{
super(floats);
this.floats = floats;
}
@Override
public float get()
{
return floats[(int)(index % floats.length)];
}
public void increment()
{
++index;
if (index < 0) {
index = 0;
}
}
}
}

View File

@ -0,0 +1,53 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation;
import io.druid.segment.ObjectColumnSelector;
/**
*/
public class TestObjectColumnSelector implements ObjectColumnSelector
{
private final Object[] objects;
private int index = 0;
public TestObjectColumnSelector(Object... objects)
{
this.objects = objects;
}
@Override
public Class classOfObject()
{
return objects[index].getClass();
}
@Override
public Object get()
{
return objects[index];
}
public void increment()
{
++index;
}
}

View File

@ -19,6 +19,7 @@
package io.druid.query.timeseries;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequences;
@ -1262,6 +1263,85 @@ public class TimeseriesQueryRunnerTest
TestHelper.assertExpectedResults(expectedResults, results);
}
@Test
public void testTimeseriesWithMultiValueFilteringJavascriptAggregator()
{
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(
ImmutableList.of(
QueryRunnerTestHelper.indexDoubleSum,
QueryRunnerTestHelper.jsIndexSumIfPlacementishA,
QueryRunnerTestHelper.jsPlacementishCount
)
)
.build();
Iterable<Result<TimeseriesResultValue>> expectedResults = ImmutableList.of(
new Result<>(
new DateTime(
QueryRunnerTestHelper.firstToThird.getIntervals()
.get(0)
.getStart()
),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"index", 12459.361190795898d,
"nindex", 283.31103515625d,
"pishcount", 52d
)
)
)
);
Iterable<Result<TimeseriesResultValue>> actualResults = Sequences.toList(
runner.run(query),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults, actualResults);
}
@Test
public void testTimeseriesWithMultiValueFilteringJavascriptAggregatorAndAlsoRegularFilters()
{
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.filters(QueryRunnerTestHelper.placementishDimension, "a")
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(
ImmutableList.of(
QueryRunnerTestHelper.indexDoubleSum,
QueryRunnerTestHelper.jsIndexSumIfPlacementishA,
QueryRunnerTestHelper.jsPlacementishCount
)
)
.build();
Iterable<Result<TimeseriesResultValue>> expectedResults = ImmutableList.of(
new Result<>(
new DateTime(
QueryRunnerTestHelper.firstToThird.getIntervals()
.get(0)
.getStart()
),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"index", 283.31103515625d,
"nindex", 283.31103515625d,
"pishcount", 4d
)
)
)
);
Iterable<Result<TimeseriesResultValue>> actualResults = Sequences.toList(
runner.run(query),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults, actualResults);
}
@Test
public void testTimeseriesWithMultiValueDimFilter1()
{