add close method to aggregators

This commit is contained in:
xvrl 2013-03-18 13:18:28 -07:00
parent f47319f118
commit c68bd66945
22 changed files with 163 additions and 24 deletions

View File

@ -19,8 +19,6 @@
package com.metamx.druid.aggregation;
import java.util.Comparator;
/**
* An Aggregator is an object that can aggregate metrics. Its aggregation-related methods (namely, aggregate() and get())
* do not take any arguments as the assumption is that the Aggregator was given something in its constructor that
@ -40,4 +38,5 @@ public interface Aggregator {
Object get();
float getFloat();
String getName();
void close();
}

View File

@ -88,4 +88,9 @@ public interface BufferAggregator
* @return the float representation of the aggregate
*/
float getFloat(ByteBuffer buf, int position);
/**
* Release any resources used by the aggregator
*/
void close();
}

View File

@ -75,4 +75,10 @@ public class CountAggregator implements Aggregator
{
return new CountAggregator(name);
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -49,4 +49,10 @@ public class CountBufferAggregator implements BufferAggregator
{
return buf.getLong(position);
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -89,4 +89,10 @@ public class DoubleSumAggregator implements Aggregator
{
return new DoubleSumAggregator(name, selector);
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -59,4 +59,10 @@ public class DoubleSumBufferAggregator implements BufferAggregator
{
return (float) buf.getDouble(position);
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -80,4 +80,10 @@ public class HistogramAggregator implements Aggregator
{
return name;
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -90,4 +90,10 @@ public class HistogramBufferAggregator implements BufferAggregator
{
throw new UnsupportedOperationException("HistogramBufferAggregator does not support getFloat()");
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -21,6 +21,7 @@ package com.metamx.druid.aggregation;
import com.google.common.collect.Lists;
import com.metamx.druid.processing.FloatMetricSelector;
import org.mozilla.javascript.Context;
import java.util.List;
@ -33,6 +34,8 @@ public class JavaScriptAggregator implements Aggregator
public double combine(double a, double b);
public double reset();
public void close();
}
private final String name;
@ -79,4 +82,10 @@ public class JavaScriptAggregator implements Aggregator
{
return name;
}
@Override
public void close()
{
script.close();
}
}

View File

@ -30,6 +30,8 @@ import com.metamx.druid.processing.MetricSelectorFactory;
import org.mozilla.javascript.Context;
import org.mozilla.javascript.ContextAction;
import org.mozilla.javascript.ContextFactory;
import org.mozilla.javascript.Function;
import org.mozilla.javascript.Script;
import org.mozilla.javascript.ScriptableObject;
@ -49,7 +51,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
private final List<String> fieldNames;
private final String script;
private final JavaScriptAggregator.ScriptAggregator combiner;
private final JavaScriptAggregator.ScriptAggregator compiledScript;
@JsonCreator
public JavaScriptAggregatorFactory(
@ -61,7 +63,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
this.name = name;
this.script = expression;
this.fieldNames = fieldNames;
this.combiner = compileScript(script);
this.compiledScript = compileScript(script);
}
@Override
@ -77,7 +79,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
public FloatMetricSelector apply(@Nullable String s) { return metricFactory.makeFloatMetricSelector(s); }
}
),
compileScript(script)
compiledScript
);
}
@ -96,7 +98,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
}
}
),
compileScript(script)
compiledScript
);
}
@ -109,7 +111,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
@Override
public Object combine(Object lhs, Object rhs)
{
return combiner.combine(((Number) lhs).doubleValue(), ((Number) rhs).doubleValue());
return compiledScript.combine(((Number) lhs).doubleValue(), ((Number) rhs).doubleValue());
}
@Override
@ -187,7 +189,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
@Override
public Object getAggregatorStartValue()
{
return combiner.reset();
return compiledScript.reset();
}
@Override
@ -212,22 +214,29 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
public static JavaScriptAggregator.ScriptAggregator compileScript(final String script)
{
final Context cx = Context.enter();
cx.setOptimizationLevel(9);
final ScriptableObject scope = cx.initStandardObjects();
final ContextFactory contextFactory = ContextFactory.getGlobal();
Context context = contextFactory.enterContext();
context.setOptimizationLevel(9);
Script compiledScript = cx.compileString(script, "script", 1, null);
compiledScript.exec(cx, scope);
final ScriptableObject scope = context.initStandardObjects();
Script compiledScript = context.compileString(script, "script", 1, null);
compiledScript.exec(context, scope);
final Function fnAggregate = getScriptFunction("aggregate", scope);
final Function fnReset = getScriptFunction("reset", scope);
final Function fnCombine = getScriptFunction("combine", scope);
Context.exit();
return new JavaScriptAggregator.ScriptAggregator()
{
@Override
public double aggregate(double current, FloatMetricSelector[] selectorList)
public double aggregate(final double current, final FloatMetricSelector[] selectorList)
{
Context cx = Context.getCurrentContext();
if(cx == null) cx = contextFactory.enterContext();
final int size = selectorList.length;
final Object[] args = new Object[size + 1];
@ -237,29 +246,45 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
args[i + 1] = selectorList[i++].get();
}
Object res = fnAggregate.call(cx, scope, scope, args);
final Object res = fnAggregate.call(cx, scope, scope, args);
return Context.toNumber(res);
}
@Override
public double combine(double a, double b)
public double combine(final double a, final double b)
{
Object res = fnCombine.call(cx, scope, scope, new Object[]{a, b});
final Object res = contextFactory.call(
new ContextAction()
{
@Override
public Object run(final Context cx)
{
return fnCombine.call(cx, scope, scope, new Object[]{a, b});
}
}
);
return Context.toNumber(res);
}
@Override
public double reset()
{
Object res = fnReset.call(cx, scope, scope, new Object[]{});
final Object res = contextFactory.call(
new ContextAction()
{
@Override
public Object run(final Context cx)
{
return fnReset.call(cx, scope, scope, new Object[]{});
}
}
);
return Context.toNumber(res);
}
@Override
protected void finalize() throws Throwable
{
cx.exit();
super.finalize();
public void close() {
if(Context.getCurrentContext() != null) Context.exit();
}
};
}

View File

@ -62,4 +62,9 @@ public class JavaScriptBufferAggregator implements BufferAggregator
{
return (float)buf.getDouble(position);
}
@Override
public void close() {
script.close();
}
}

View File

@ -89,4 +89,10 @@ public class LongSumAggregator implements Aggregator
{
return new LongSumAggregator(name, selector);
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -59,4 +59,10 @@ public class LongSumBufferAggregator implements BufferAggregator
{
return (float) buf.getLong(position);
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -82,4 +82,10 @@ public class MaxAggregator implements Aggregator
{
return new MaxAggregator(name, selector);
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -57,4 +57,10 @@ public class MaxBufferAggregator implements BufferAggregator
{
return (float) buf.getDouble(position);
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -82,4 +82,10 @@ public class MinAggregator implements Aggregator
{
return new MinAggregator(name, selector);
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -57,4 +57,10 @@ public class MinBufferAggregator implements BufferAggregator
{
return (float) buf.getDouble(position);
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -59,4 +59,10 @@ public class NoopAggregator implements Aggregator
{
return name;
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -46,4 +46,10 @@ public class NoopBufferAggregator implements BufferAggregator
{
return 0;
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -0,0 +1,5 @@
package com.metamx.druid.processing;
public class MetricSelectorUtils
{
}

View File

@ -300,7 +300,9 @@ public class GroupByQueryEngine
@Override
public boolean hasNext()
{
return delegate.hasNext() || !cursor.isDone();
boolean hasNext = delegate.hasNext() || !cursor.isDone();
if(!hasNext) for(BufferAggregator agg : aggregators) agg.close();
return hasNext;
}
@Override

View File

@ -80,7 +80,12 @@ public class TimeseriesQueryEngine
bob.addMetric(postAgg);
}
return bob.build();
Result<TimeseriesResultValue> retVal = bob.build();
// cleanup
for(Aggregator agg : aggregators) agg.close();
return retVal;
}
}
).iterator();