diff --git a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java index 11664314cd5..f2dd1604af4 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java +++ b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java @@ -21,6 +21,7 @@ package com.metamx.druid.aggregation; import com.google.common.base.Joiner; import com.google.common.collect.Lists; +import com.google.common.primitives.Doubles; import com.metamx.common.IAE; import com.metamx.druid.processing.FloatMetricSelector; import com.metamx.druid.processing.MetricSelectorFactory; @@ -79,9 +80,22 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory } @Override - public BufferAggregator factorizeBuffered(MetricSelectorFactory metricFactory) + public BufferAggregator factorizeBuffered(final MetricSelectorFactory metricFactory) { - throw new UnsupportedOperationException("Non-buffered not working right now either..."); + return new JavaScriptBufferAggregator( + Lists.transform( + fieldNames, + new com.google.common.base.Function() + { + @Override + public FloatMetricSelector apply(@Nullable String s) + { + return metricFactory.makeFloatMetricSelector(s); + } + } + ), + compileScript(script) + ); } @Override @@ -165,7 +179,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory @Override public int getMaxIntermediateSize() { - throw new UnsupportedOperationException("Not sure."); + return Doubles.BYTES; } @Override diff --git a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptBufferAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptBufferAggregator.java new file mode 100644 index 00000000000..02fe27abda4 --- /dev/null +++ b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptBufferAggregator.java @@ -0,0 +1,65 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.aggregation; + +import com.google.common.collect.Lists; +import com.metamx.druid.processing.FloatMetricSelector; + +import java.nio.ByteBuffer; +import java.util.List; + +public class JavaScriptBufferAggregator implements BufferAggregator +{ + private final FloatMetricSelector[] selectorList; + private final JavaScriptAggregator.ScriptAggregator script; + + public JavaScriptBufferAggregator( + List selectorList, + JavaScriptAggregator.ScriptAggregator script + ) + { + this.selectorList = Lists.newArrayList(selectorList).toArray(new FloatMetricSelector[]{}); + this.script = script; + } + + @Override + public void init(ByteBuffer buf, int position) + { + buf.putDouble(position, script.reset()); + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + buf.putDouble(position, script.aggregate(buf.getDouble(position), selectorList)); + } + + @Override + public Object get(ByteBuffer buf, int position) + { + return buf.getDouble(position); + } + + @Override + public float getFloat(ByteBuffer buf, int position) + { + return (float)buf.getDouble(position); + } +} diff --git a/common/src/test/java/com/metamx/druid/aggregation/JavaScriptAggregatorTest.java b/common/src/test/java/com/metamx/druid/aggregation/JavaScriptAggregatorTest.java index e8feb40e225..96c2606c2c2 100644 --- a/common/src/test/java/com/metamx/druid/aggregation/JavaScriptAggregatorTest.java +++ b/common/src/test/java/com/metamx/druid/aggregation/JavaScriptAggregatorTest.java @@ -21,10 +21,12 @@ package com.metamx.druid.aggregation; import com.google.common.collect.Lists; +import com.google.common.primitives.Doubles; import com.metamx.druid.processing.FloatMetricSelector; import org.junit.Assert; import org.junit.Test; +import java.nio.ByteBuffer; import java.util.Arrays; public class JavaScriptAggregatorTest @@ -46,6 +48,16 @@ public class JavaScriptAggregatorTest selector2.increment(); } + private void aggregateBuffer(TestFloatMetricSelector selector1, + TestFloatMetricSelector selector2, + BufferAggregator agg, + ByteBuffer buf, int position) + { + agg.aggregate(buf, position); + selector1.increment(); + selector2.increment(); + } + private static void aggregate(TestFloatMetricSelector selector, Aggregator agg) { agg.aggregate(); @@ -53,7 +65,7 @@ public class JavaScriptAggregatorTest } @Test - public void testJavaScriptAggregator() + public void testAggregate() { final TestFloatMetricSelector selector1 = new TestFloatMetricSelector(new float[]{42.12f, 9f}); final TestFloatMetricSelector selector2 = new TestFloatMetricSelector(new float[]{2f, 3f}); @@ -86,6 +98,39 @@ public class JavaScriptAggregatorTest Assert.assertEquals(val, agg.get()); } + @Test + public void testBufferAggregate() + { + final TestFloatMetricSelector selector1 = new TestFloatMetricSelector(new float[]{42.12f, 9f}); + final TestFloatMetricSelector selector2 = new TestFloatMetricSelector(new float[]{2f, 3f}); + + JavaScriptBufferAggregator agg = new JavaScriptBufferAggregator( + Arrays.asList(selector1, selector2), + JavaScriptAggregatorFactory.compileScript(sumLogATimesBPlusTen) + ); + + ByteBuffer buf = ByteBuffer.allocateDirect(32); + final int position = 4; + agg.init(buf, position); + + double val = 10.; + Assert.assertEquals(val, agg.get(buf, position)); + Assert.assertEquals(val, agg.get(buf, position)); + Assert.assertEquals(val, agg.get(buf, position)); + aggregateBuffer(selector1, selector2, agg, buf, position); + + val += Math.log(42.12f) * 2f; + Assert.assertEquals(val, agg.get(buf, position)); + Assert.assertEquals(val, agg.get(buf, position)); + Assert.assertEquals(val, agg.get(buf, position)); + + aggregateBuffer(selector1, selector2, agg, buf, position); + val += Math.log(9f) * 3f; + Assert.assertEquals(val, agg.get(buf, position)); + Assert.assertEquals(val, agg.get(buf, position)); + Assert.assertEquals(val, agg.get(buf, position)); + } + public static void main(String... args) throws Exception { final LoopingFloatMetricSelector selector = new LoopingFloatMetricSelector(new float[]{42.12f, 9f}); @@ -170,7 +215,9 @@ public class JavaScriptAggregatorTest public void increment() { ++index; - if(index < 0) index = 0; + if (index < 0) { + index = 0; + } } } }