Merge pull request #13 from metamx/jsbufferfix

add buffered version of JavaScriptAggregator
This commit is contained in:
cheddar 2012-10-29 16:04:13 -07:00
commit 8a49c156ed
3 changed files with 131 additions and 5 deletions

View File

@ -21,6 +21,7 @@ package com.metamx.druid.aggregation;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.primitives.Doubles;
import com.metamx.common.IAE;
import com.metamx.druid.processing.FloatMetricSelector;
import com.metamx.druid.processing.MetricSelectorFactory;
@ -79,9 +80,22 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
}
@Override
public BufferAggregator factorizeBuffered(MetricSelectorFactory metricFactory)
public BufferAggregator factorizeBuffered(final MetricSelectorFactory metricFactory)
{
throw new UnsupportedOperationException("Non-buffered not working right now either...");
return new JavaScriptBufferAggregator(
Lists.transform(
fieldNames,
new com.google.common.base.Function<String, FloatMetricSelector>()
{
@Override
public FloatMetricSelector apply(@Nullable String s)
{
return metricFactory.makeFloatMetricSelector(s);
}
}
),
compileScript(script)
);
}
@Override
@ -165,7 +179,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
@Override
public int getMaxIntermediateSize()
{
throw new UnsupportedOperationException("Not sure.");
return Doubles.BYTES;
}
@Override

View File

@ -0,0 +1,65 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.aggregation;
import com.google.common.collect.Lists;
import com.metamx.druid.processing.FloatMetricSelector;
import java.nio.ByteBuffer;
import java.util.List;
public class JavaScriptBufferAggregator implements BufferAggregator
{
private final FloatMetricSelector[] selectorList;
private final JavaScriptAggregator.ScriptAggregator script;
public JavaScriptBufferAggregator(
List<FloatMetricSelector> selectorList,
JavaScriptAggregator.ScriptAggregator script
)
{
this.selectorList = Lists.newArrayList(selectorList).toArray(new FloatMetricSelector[]{});
this.script = script;
}
@Override
public void init(ByteBuffer buf, int position)
{
buf.putDouble(position, script.reset());
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
buf.putDouble(position, script.aggregate(buf.getDouble(position), selectorList));
}
@Override
public Object get(ByteBuffer buf, int position)
{
return buf.getDouble(position);
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
return (float)buf.getDouble(position);
}
}

View File

@ -21,10 +21,12 @@ package com.metamx.druid.aggregation;
import com.google.common.collect.Lists;
import com.google.common.primitives.Doubles;
import com.metamx.druid.processing.FloatMetricSelector;
import org.junit.Assert;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Arrays;
public class JavaScriptAggregatorTest
@ -46,6 +48,16 @@ public class JavaScriptAggregatorTest
selector2.increment();
}
private void aggregateBuffer(TestFloatMetricSelector selector1,
TestFloatMetricSelector selector2,
BufferAggregator agg,
ByteBuffer buf, int position)
{
agg.aggregate(buf, position);
selector1.increment();
selector2.increment();
}
private static void aggregate(TestFloatMetricSelector selector, Aggregator agg)
{
agg.aggregate();
@ -53,7 +65,7 @@ public class JavaScriptAggregatorTest
}
@Test
public void testJavaScriptAggregator()
public void testAggregate()
{
final TestFloatMetricSelector selector1 = new TestFloatMetricSelector(new float[]{42.12f, 9f});
final TestFloatMetricSelector selector2 = new TestFloatMetricSelector(new float[]{2f, 3f});
@ -86,6 +98,39 @@ public class JavaScriptAggregatorTest
Assert.assertEquals(val, agg.get());
}
@Test
public void testBufferAggregate()
{
final TestFloatMetricSelector selector1 = new TestFloatMetricSelector(new float[]{42.12f, 9f});
final TestFloatMetricSelector selector2 = new TestFloatMetricSelector(new float[]{2f, 3f});
JavaScriptBufferAggregator agg = new JavaScriptBufferAggregator(
Arrays.<FloatMetricSelector>asList(selector1, selector2),
JavaScriptAggregatorFactory.compileScript(sumLogATimesBPlusTen)
);
ByteBuffer buf = ByteBuffer.allocateDirect(32);
final int position = 4;
agg.init(buf, position);
double val = 10.;
Assert.assertEquals(val, agg.get(buf, position));
Assert.assertEquals(val, agg.get(buf, position));
Assert.assertEquals(val, agg.get(buf, position));
aggregateBuffer(selector1, selector2, agg, buf, position);
val += Math.log(42.12f) * 2f;
Assert.assertEquals(val, agg.get(buf, position));
Assert.assertEquals(val, agg.get(buf, position));
Assert.assertEquals(val, agg.get(buf, position));
aggregateBuffer(selector1, selector2, agg, buf, position);
val += Math.log(9f) * 3f;
Assert.assertEquals(val, agg.get(buf, position));
Assert.assertEquals(val, agg.get(buf, position));
Assert.assertEquals(val, agg.get(buf, position));
}
public static void main(String... args) throws Exception {
final LoopingFloatMetricSelector selector = new LoopingFloatMetricSelector(new float[]{42.12f, 9f});
@ -170,7 +215,9 @@ public class JavaScriptAggregatorTest
public void increment()
{
++index;
if(index < 0) index = 0;
if (index < 0) {
index = 0;
}
}
}
}