mirror of https://github.com/apache/druid.git
javascript aggregator now uses column selector
This commit is contained in:
parent
b66f69def6
commit
66a52ed282
|
@ -20,7 +20,7 @@
|
||||||
package com.metamx.druid.aggregation;
|
package com.metamx.druid.aggregation;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.metamx.druid.processing.FloatMetricSelector;
|
import com.metamx.druid.processing.ObjectColumnSelector;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
@ -28,7 +28,7 @@ public class JavaScriptAggregator implements Aggregator
|
||||||
{
|
{
|
||||||
static interface ScriptAggregator
|
static interface ScriptAggregator
|
||||||
{
|
{
|
||||||
public double aggregate(double current, FloatMetricSelector[] selectorList);
|
public double aggregate(double current, ObjectColumnSelector[] selectorList);
|
||||||
|
|
||||||
public double combine(double a, double b);
|
public double combine(double a, double b);
|
||||||
|
|
||||||
|
@ -38,15 +38,15 @@ public class JavaScriptAggregator implements Aggregator
|
||||||
}
|
}
|
||||||
|
|
||||||
private final String name;
|
private final String name;
|
||||||
private final FloatMetricSelector[] selectorList;
|
private final ObjectColumnSelector[] selectorList;
|
||||||
private final ScriptAggregator script;
|
private final ScriptAggregator script;
|
||||||
|
|
||||||
private volatile double current;
|
private volatile double current;
|
||||||
|
|
||||||
public JavaScriptAggregator(String name, List<FloatMetricSelector> selectorList, ScriptAggregator script)
|
public JavaScriptAggregator(String name, List<ObjectColumnSelector> selectorList, ScriptAggregator script)
|
||||||
{
|
{
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.selectorList = Lists.newArrayList(selectorList).toArray(new FloatMetricSelector[]{});
|
this.selectorList = Lists.newArrayList(selectorList).toArray(new ObjectColumnSelector[]{});
|
||||||
this.script = script;
|
this.script = script;
|
||||||
|
|
||||||
this.current = script.reset();
|
this.current = script.reset();
|
||||||
|
|
|
@ -21,18 +21,16 @@ package com.metamx.druid.aggregation;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import com.google.common.base.Charsets;
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.primitives.Doubles;
|
import com.google.common.primitives.Doubles;
|
||||||
import com.metamx.common.IAE;
|
|
||||||
import com.metamx.druid.processing.FloatMetricSelector;
|
|
||||||
import com.metamx.druid.processing.ColumnSelectorFactory;
|
import com.metamx.druid.processing.ColumnSelectorFactory;
|
||||||
|
import com.metamx.druid.processing.ObjectColumnSelector;
|
||||||
import org.mozilla.javascript.Context;
|
import org.mozilla.javascript.Context;
|
||||||
import org.mozilla.javascript.ContextAction;
|
import org.mozilla.javascript.ContextAction;
|
||||||
import org.mozilla.javascript.ContextFactory;
|
import org.mozilla.javascript.ContextFactory;
|
||||||
import org.mozilla.javascript.Function;
|
import org.mozilla.javascript.Function;
|
||||||
import org.mozilla.javascript.Script;
|
|
||||||
import org.mozilla.javascript.ScriptableObject;
|
import org.mozilla.javascript.ScriptableObject;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
@ -76,16 +74,18 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Aggregator factorize(final ColumnSelectorFactory metricFactory)
|
public Aggregator factorize(final ColumnSelectorFactory columnFactory)
|
||||||
{
|
{
|
||||||
return new JavaScriptAggregator(
|
return new JavaScriptAggregator(
|
||||||
name,
|
name,
|
||||||
Lists.transform(
|
Lists.transform(
|
||||||
fieldNames,
|
fieldNames,
|
||||||
new com.google.common.base.Function<String, FloatMetricSelector>()
|
new com.google.common.base.Function<String, ObjectColumnSelector>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public FloatMetricSelector apply(@Nullable String s) { return metricFactory.makeFloatMetricSelector(s); }
|
public ObjectColumnSelector apply(@Nullable String s) {
|
||||||
|
return columnFactory.makeObjectColumnSelector(s);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
compiledScript
|
compiledScript
|
||||||
|
@ -93,17 +93,16 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public BufferAggregator factorizeBuffered(final ColumnSelectorFactory metricFactory)
|
public BufferAggregator factorizeBuffered(final ColumnSelectorFactory columnSelectorFactory)
|
||||||
{
|
{
|
||||||
return new JavaScriptBufferAggregator(
|
return new JavaScriptBufferAggregator(
|
||||||
Lists.transform(
|
Lists.transform(
|
||||||
fieldNames,
|
fieldNames,
|
||||||
new com.google.common.base.Function<String, FloatMetricSelector>()
|
new com.google.common.base.Function<String, ObjectColumnSelector>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public FloatMetricSelector apply(@Nullable String s)
|
public ObjectColumnSelector apply(@Nullable String s) {
|
||||||
{
|
return columnSelectorFactory.makeObjectColumnSelector(s);
|
||||||
return metricFactory.makeFloatMetricSelector(s);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
|
@ -182,8 +181,8 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
MessageDigest md = MessageDigest.getInstance("SHA-1");
|
MessageDigest md = MessageDigest.getInstance("SHA-1");
|
||||||
byte[] fieldNameBytes = Joiner.on(",").join(fieldNames).getBytes();
|
byte[] fieldNameBytes = Joiner.on(",").join(fieldNames).getBytes(Charsets.UTF_8);
|
||||||
byte[] sha1 = md.digest((fnAggregate+fnReset+fnCombine).getBytes());
|
byte[] sha1 = md.digest((fnAggregate+fnReset+fnCombine).getBytes(Charsets.UTF_8));
|
||||||
|
|
||||||
return ByteBuffer.allocate(1 + fieldNameBytes.length + sha1.length)
|
return ByteBuffer.allocate(1 + fieldNameBytes.length + sha1.length)
|
||||||
.put(CACHE_TYPE_ID)
|
.put(CACHE_TYPE_ID)
|
||||||
|
@ -242,7 +241,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
|
||||||
return new JavaScriptAggregator.ScriptAggregator()
|
return new JavaScriptAggregator.ScriptAggregator()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public double aggregate(final double current, final FloatMetricSelector[] selectorList)
|
public double aggregate(final double current, final ObjectColumnSelector[] selectorList)
|
||||||
{
|
{
|
||||||
Context cx = Context.getCurrentContext();
|
Context cx = Context.getCurrentContext();
|
||||||
if(cx == null) cx = contextFactory.enterContext();
|
if(cx == null) cx = contextFactory.enterContext();
|
||||||
|
|
|
@ -20,22 +20,24 @@
|
||||||
package com.metamx.druid.aggregation;
|
package com.metamx.druid.aggregation;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
import com.metamx.druid.processing.ComplexMetricSelector;
|
||||||
import com.metamx.druid.processing.FloatMetricSelector;
|
import com.metamx.druid.processing.FloatMetricSelector;
|
||||||
|
import com.metamx.druid.processing.ObjectColumnSelector;
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
public class JavaScriptBufferAggregator implements BufferAggregator
|
public class JavaScriptBufferAggregator implements BufferAggregator
|
||||||
{
|
{
|
||||||
private final FloatMetricSelector[] selectorList;
|
private final ObjectColumnSelector[] selectorList;
|
||||||
private final JavaScriptAggregator.ScriptAggregator script;
|
private final JavaScriptAggregator.ScriptAggregator script;
|
||||||
|
|
||||||
public JavaScriptBufferAggregator(
|
public JavaScriptBufferAggregator(
|
||||||
List<FloatMetricSelector> selectorList,
|
List<ObjectColumnSelector> selectorList,
|
||||||
JavaScriptAggregator.ScriptAggregator script
|
JavaScriptAggregator.ScriptAggregator script
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.selectorList = Lists.newArrayList(selectorList).toArray(new FloatMetricSelector[]{});
|
this.selectorList = Lists.newArrayList(selectorList).toArray(new ObjectColumnSelector[]{});
|
||||||
this.script = script;
|
this.script = script;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,5 +0,0 @@
|
||||||
package com.metamx.druid.processing;
|
|
||||||
|
|
||||||
public class MetricSelectorUtils
|
|
||||||
{
|
|
||||||
}
|
|
|
@ -21,7 +21,7 @@ package com.metamx.druid.aggregation;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.metamx.druid.processing.FloatMetricSelector;
|
import com.metamx.druid.processing.ObjectColumnSelector;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -77,7 +77,7 @@ public class JavaScriptAggregatorTest
|
||||||
|
|
||||||
JavaScriptAggregator agg = new JavaScriptAggregator(
|
JavaScriptAggregator agg = new JavaScriptAggregator(
|
||||||
"billy",
|
"billy",
|
||||||
Arrays.<FloatMetricSelector>asList(selector1, selector2),
|
Arrays.<ObjectColumnSelector>asList(MetricSelectorUtils.wrap(selector1), MetricSelectorUtils.wrap(selector2)),
|
||||||
JavaScriptAggregatorFactory.compileScript(script.get("fnAggregate"),
|
JavaScriptAggregatorFactory.compileScript(script.get("fnAggregate"),
|
||||||
script.get("fnReset"),
|
script.get("fnReset"),
|
||||||
script.get("fnCombine"))
|
script.get("fnCombine"))
|
||||||
|
@ -113,7 +113,7 @@ public class JavaScriptAggregatorTest
|
||||||
|
|
||||||
Map<String, String> script = sumLogATimesBPlusTen;
|
Map<String, String> script = sumLogATimesBPlusTen;
|
||||||
JavaScriptBufferAggregator agg = new JavaScriptBufferAggregator(
|
JavaScriptBufferAggregator agg = new JavaScriptBufferAggregator(
|
||||||
Arrays.<FloatMetricSelector>asList(selector1, selector2),
|
Arrays.<ObjectColumnSelector>asList(MetricSelectorUtils.wrap(selector1), MetricSelectorUtils.wrap(selector2)),
|
||||||
JavaScriptAggregatorFactory.compileScript(script.get("fnAggregate"),
|
JavaScriptAggregatorFactory.compileScript(script.get("fnAggregate"),
|
||||||
script.get("fnReset"),
|
script.get("fnReset"),
|
||||||
script.get("fnCombine"))
|
script.get("fnCombine"))
|
||||||
|
@ -164,7 +164,7 @@ public class JavaScriptAggregatorTest
|
||||||
Map<String, String> script = scriptDoubleSum;
|
Map<String, String> script = scriptDoubleSum;
|
||||||
JavaScriptAggregator aggRhino = new JavaScriptAggregator(
|
JavaScriptAggregator aggRhino = new JavaScriptAggregator(
|
||||||
"billy",
|
"billy",
|
||||||
Lists.asList(selector, new FloatMetricSelector[]{}),
|
Lists.asList(MetricSelectorUtils.wrap(selector), new ObjectColumnSelector[]{}),
|
||||||
JavaScriptAggregatorFactory.compileScript(script.get("fnAggregate"),
|
JavaScriptAggregatorFactory.compileScript(script.get("fnAggregate"),
|
||||||
script.get("fnReset"),
|
script.get("fnReset"),
|
||||||
script.get("fnCombine"))
|
script.get("fnCombine"))
|
||||||
|
|
|
@ -0,0 +1,45 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2013 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.metamx.druid.aggregation;
|
||||||
|
|
||||||
|
import com.metamx.druid.processing.ComplexMetricSelector;
|
||||||
|
import com.metamx.druid.processing.FloatMetricSelector;
|
||||||
|
import com.metamx.druid.processing.ObjectColumnSelector;
|
||||||
|
|
||||||
|
public class MetricSelectorUtils
|
||||||
|
{
|
||||||
|
public static ObjectColumnSelector<Float> wrap(final FloatMetricSelector selector)
|
||||||
|
{
|
||||||
|
return new ObjectColumnSelector<Float>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Class<Float> classOfObject()
|
||||||
|
{
|
||||||
|
return Float.TYPE;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Float get()
|
||||||
|
{
|
||||||
|
return selector.get();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue