mirror of https://github.com/apache/druid.git
add buffered version of JavaScriptAggregator
This commit is contained in:
parent
afb347f7ec
commit
a34c5a100c
|
@ -21,6 +21,7 @@ package com.metamx.druid.aggregation;
|
|||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.primitives.Doubles;
|
||||
import com.metamx.common.IAE;
|
||||
import com.metamx.druid.processing.FloatMetricSelector;
|
||||
import com.metamx.druid.processing.MetricSelectorFactory;
|
||||
|
@ -79,9 +80,22 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
|
|||
}
|
||||
|
||||
@Override
|
||||
public BufferAggregator factorizeBuffered(MetricSelectorFactory metricFactory)
|
||||
public BufferAggregator factorizeBuffered(final MetricSelectorFactory metricFactory)
|
||||
{
|
||||
throw new UnsupportedOperationException("Non-buffered not working right now either...");
|
||||
return new JavaScriptBufferAggregator(
|
||||
Lists.transform(
|
||||
fieldNames,
|
||||
new com.google.common.base.Function<String, FloatMetricSelector>()
|
||||
{
|
||||
@Override
|
||||
public FloatMetricSelector apply(@Nullable String s)
|
||||
{
|
||||
return metricFactory.makeFloatMetricSelector(s);
|
||||
}
|
||||
}
|
||||
),
|
||||
compileScript(script)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -165,7 +179,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
|
|||
@Override
|
||||
public int getMaxIntermediateSize()
|
||||
{
|
||||
throw new UnsupportedOperationException("Not sure.");
|
||||
return Doubles.BYTES;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.aggregation;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.metamx.druid.processing.FloatMetricSelector;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
|
||||
public class JavaScriptBufferAggregator implements BufferAggregator
|
||||
{
|
||||
private final FloatMetricSelector[] selectorList;
|
||||
private final JavaScriptAggregator.ScriptAggregator script;
|
||||
|
||||
public JavaScriptBufferAggregator(
|
||||
List<FloatMetricSelector> selectorList,
|
||||
JavaScriptAggregator.ScriptAggregator script
|
||||
)
|
||||
{
|
||||
this.selectorList = Lists.newArrayList(selectorList).toArray(new FloatMetricSelector[]{});
|
||||
this.script = script;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(ByteBuffer buf, int position)
|
||||
{
|
||||
buf.putDouble(position, script.reset());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(ByteBuffer buf, int position)
|
||||
{
|
||||
buf.putDouble(position, script.aggregate(buf.getDouble(position), selectorList));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get(ByteBuffer buf, int position)
|
||||
{
|
||||
return buf.getDouble(position);
|
||||
}
|
||||
|
||||
@Override
|
||||
public float getFloat(ByteBuffer buf, int position)
|
||||
{
|
||||
return (float)buf.getDouble(position);
|
||||
}
|
||||
}
|
|
@ -21,10 +21,12 @@ package com.metamx.druid.aggregation;
|
|||
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.primitives.Doubles;
|
||||
import com.metamx.druid.processing.FloatMetricSelector;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
|
||||
public class JavaScriptAggregatorTest
|
||||
|
@ -46,6 +48,16 @@ public class JavaScriptAggregatorTest
|
|||
selector2.increment();
|
||||
}
|
||||
|
||||
private void aggregateBuffer(TestFloatMetricSelector selector1,
|
||||
TestFloatMetricSelector selector2,
|
||||
BufferAggregator agg,
|
||||
ByteBuffer buf, int position)
|
||||
{
|
||||
agg.aggregate(buf, position);
|
||||
selector1.increment();
|
||||
selector2.increment();
|
||||
}
|
||||
|
||||
private static void aggregate(TestFloatMetricSelector selector, Aggregator agg)
|
||||
{
|
||||
agg.aggregate();
|
||||
|
@ -53,7 +65,7 @@ public class JavaScriptAggregatorTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testJavaScriptAggregator()
|
||||
public void testAggregate()
|
||||
{
|
||||
final TestFloatMetricSelector selector1 = new TestFloatMetricSelector(new float[]{42.12f, 9f});
|
||||
final TestFloatMetricSelector selector2 = new TestFloatMetricSelector(new float[]{2f, 3f});
|
||||
|
@ -86,6 +98,39 @@ public class JavaScriptAggregatorTest
|
|||
Assert.assertEquals(val, agg.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBufferAggregate()
|
||||
{
|
||||
final TestFloatMetricSelector selector1 = new TestFloatMetricSelector(new float[]{42.12f, 9f});
|
||||
final TestFloatMetricSelector selector2 = new TestFloatMetricSelector(new float[]{2f, 3f});
|
||||
|
||||
JavaScriptBufferAggregator agg = new JavaScriptBufferAggregator(
|
||||
Arrays.<FloatMetricSelector>asList(selector1, selector2),
|
||||
JavaScriptAggregatorFactory.compileScript(sumLogATimesBPlusTen)
|
||||
);
|
||||
|
||||
ByteBuffer buf = ByteBuffer.allocateDirect(32);
|
||||
final int position = 4;
|
||||
agg.init(buf, position);
|
||||
|
||||
double val = 10.;
|
||||
Assert.assertEquals(val, agg.get(buf, position));
|
||||
Assert.assertEquals(val, agg.get(buf, position));
|
||||
Assert.assertEquals(val, agg.get(buf, position));
|
||||
aggregateBuffer(selector1, selector2, agg, buf, position);
|
||||
|
||||
val += Math.log(42.12f) * 2f;
|
||||
Assert.assertEquals(val, agg.get(buf, position));
|
||||
Assert.assertEquals(val, agg.get(buf, position));
|
||||
Assert.assertEquals(val, agg.get(buf, position));
|
||||
|
||||
aggregateBuffer(selector1, selector2, agg, buf, position);
|
||||
val += Math.log(9f) * 3f;
|
||||
Assert.assertEquals(val, agg.get(buf, position));
|
||||
Assert.assertEquals(val, agg.get(buf, position));
|
||||
Assert.assertEquals(val, agg.get(buf, position));
|
||||
}
|
||||
|
||||
public static void main(String... args) throws Exception {
|
||||
final LoopingFloatMetricSelector selector = new LoopingFloatMetricSelector(new float[]{42.12f, 9f});
|
||||
|
||||
|
@ -170,7 +215,9 @@ public class JavaScriptAggregatorTest
|
|||
public void increment()
|
||||
{
|
||||
++index;
|
||||
if(index < 0) index = 0;
|
||||
if (index < 0) {
|
||||
index = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue