From c68bd66945877f385904a14d8abd333ecee56fe2 Mon Sep 17 00:00:00 2001 From: xvrl Date: Mon, 18 Mar 2013 13:18:28 -0700 Subject: [PATCH] add close method to aggregators --- .../metamx/druid/aggregation/Aggregator.java | 3 +- .../druid/aggregation/BufferAggregator.java | 5 ++ .../druid/aggregation/CountAggregator.java | 6 ++ .../aggregation/CountBufferAggregator.java | 6 ++ .../aggregation/DoubleSumAggregator.java | 6 ++ .../DoubleSumBufferAggregator.java | 6 ++ .../aggregation/HistogramAggregator.java | 6 ++ .../HistogramBufferAggregator.java | 6 ++ .../aggregation/JavaScriptAggregator.java | 9 +++ .../JavaScriptAggregatorFactory.java | 65 +++++++++++++------ .../JavaScriptBufferAggregator.java | 5 ++ .../druid/aggregation/LongSumAggregator.java | 6 ++ .../aggregation/LongSumBufferAggregator.java | 6 ++ .../druid/aggregation/MaxAggregator.java | 6 ++ .../aggregation/MaxBufferAggregator.java | 6 ++ .../druid/aggregation/MinAggregator.java | 6 ++ .../aggregation/MinBufferAggregator.java | 6 ++ .../druid/aggregation/NoopAggregator.java | 6 ++ .../aggregation/NoopBufferAggregator.java | 6 ++ .../druid/processing/MetricSelectorUtils.java | 5 ++ .../druid/query/group/GroupByQueryEngine.java | 4 +- .../timeseries/TimeseriesQueryEngine.java | 7 +- 22 files changed, 163 insertions(+), 24 deletions(-) create mode 100644 common/src/main/java/com/metamx/druid/processing/MetricSelectorUtils.java diff --git a/common/src/main/java/com/metamx/druid/aggregation/Aggregator.java b/common/src/main/java/com/metamx/druid/aggregation/Aggregator.java index 4acbd162f5a..fd42549971e 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/Aggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/Aggregator.java @@ -19,8 +19,6 @@ package com.metamx.druid.aggregation; -import java.util.Comparator; - /** * An Aggregator is an object that can aggregate metrics. Its aggregation-related methods (namely, aggregate() and get()) * do not take any arguments as the assumption is that the Aggregator was given something in its constructor that @@ -40,4 +38,5 @@ public interface Aggregator { Object get(); float getFloat(); String getName(); + void close(); } diff --git a/common/src/main/java/com/metamx/druid/aggregation/BufferAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/BufferAggregator.java index bff68cd14a2..6a010f7a3db 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/BufferAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/BufferAggregator.java @@ -88,4 +88,9 @@ public interface BufferAggregator * @return the float representation of the aggregate */ float getFloat(ByteBuffer buf, int position); + + /** + * Release any resources used by the aggregator + */ + void close(); } diff --git a/common/src/main/java/com/metamx/druid/aggregation/CountAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/CountAggregator.java index ec656a25673..e806aa78e1c 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/CountAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/CountAggregator.java @@ -75,4 +75,10 @@ public class CountAggregator implements Aggregator { return new CountAggregator(name); } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/CountBufferAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/CountBufferAggregator.java index c2a6b5a485d..0e6f06c786f 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/CountBufferAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/CountBufferAggregator.java @@ -49,4 +49,10 @@ public class CountBufferAggregator implements BufferAggregator { return buf.getLong(position); } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/DoubleSumAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/DoubleSumAggregator.java index 21cb4691227..545c8c64feb 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/DoubleSumAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/DoubleSumAggregator.java @@ -89,4 +89,10 @@ public class DoubleSumAggregator implements Aggregator { return new DoubleSumAggregator(name, selector); } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/DoubleSumBufferAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/DoubleSumBufferAggregator.java index 3e85d489db2..62f4e75f553 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/DoubleSumBufferAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/DoubleSumBufferAggregator.java @@ -59,4 +59,10 @@ public class DoubleSumBufferAggregator implements BufferAggregator { return (float) buf.getDouble(position); } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/HistogramAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/HistogramAggregator.java index c2d9d674732..2355f458d29 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/HistogramAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/HistogramAggregator.java @@ -80,4 +80,10 @@ public class HistogramAggregator implements Aggregator { return name; } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/HistogramBufferAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/HistogramBufferAggregator.java index fe4f366631c..07c7c37dee0 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/HistogramBufferAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/HistogramBufferAggregator.java @@ -90,4 +90,10 @@ public class HistogramBufferAggregator implements BufferAggregator { throw new UnsupportedOperationException("HistogramBufferAggregator does not support getFloat()"); } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregator.java index 80daa045d82..d24277660bf 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregator.java @@ -21,6 +21,7 @@ package com.metamx.druid.aggregation; import com.google.common.collect.Lists; import com.metamx.druid.processing.FloatMetricSelector; +import org.mozilla.javascript.Context; import java.util.List; @@ -33,6 +34,8 @@ public class JavaScriptAggregator implements Aggregator public double combine(double a, double b); public double reset(); + + public void close(); } private final String name; @@ -79,4 +82,10 @@ public class JavaScriptAggregator implements Aggregator { return name; } + + @Override + public void close() + { + script.close(); + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java index 5c257f8cc74..d5132feae3d 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java +++ b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java @@ -30,6 +30,8 @@ import com.metamx.druid.processing.MetricSelectorFactory; import org.mozilla.javascript.Context; +import org.mozilla.javascript.ContextAction; +import org.mozilla.javascript.ContextFactory; import org.mozilla.javascript.Function; import org.mozilla.javascript.Script; import org.mozilla.javascript.ScriptableObject; @@ -49,7 +51,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory private final List fieldNames; private final String script; - private final JavaScriptAggregator.ScriptAggregator combiner; + private final JavaScriptAggregator.ScriptAggregator compiledScript; @JsonCreator public JavaScriptAggregatorFactory( @@ -61,7 +63,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory this.name = name; this.script = expression; this.fieldNames = fieldNames; - this.combiner = compileScript(script); + this.compiledScript = compileScript(script); } @Override @@ -77,7 +79,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory public FloatMetricSelector apply(@Nullable String s) { return metricFactory.makeFloatMetricSelector(s); } } ), - compileScript(script) + compiledScript ); } @@ -96,7 +98,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory } } ), - compileScript(script) + compiledScript ); } @@ -109,7 +111,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory @Override public Object combine(Object lhs, Object rhs) { - return combiner.combine(((Number) lhs).doubleValue(), ((Number) rhs).doubleValue()); + return compiledScript.combine(((Number) lhs).doubleValue(), ((Number) rhs).doubleValue()); } @Override @@ -187,7 +189,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory @Override public Object getAggregatorStartValue() { - return combiner.reset(); + return compiledScript.reset(); } @Override @@ -212,22 +214,29 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory public static JavaScriptAggregator.ScriptAggregator compileScript(final String script) { - final Context cx = Context.enter(); - cx.setOptimizationLevel(9); - final ScriptableObject scope = cx.initStandardObjects(); + final ContextFactory contextFactory = ContextFactory.getGlobal(); + Context context = contextFactory.enterContext(); + context.setOptimizationLevel(9); - Script compiledScript = cx.compileString(script, "script", 1, null); - compiledScript.exec(cx, scope); + final ScriptableObject scope = context.initStandardObjects(); + + Script compiledScript = context.compileString(script, "script", 1, null); + compiledScript.exec(context, scope); final Function fnAggregate = getScriptFunction("aggregate", scope); final Function fnReset = getScriptFunction("reset", scope); final Function fnCombine = getScriptFunction("combine", scope); + Context.exit(); + return new JavaScriptAggregator.ScriptAggregator() { @Override - public double aggregate(double current, FloatMetricSelector[] selectorList) + public double aggregate(final double current, final FloatMetricSelector[] selectorList) { + Context cx = Context.getCurrentContext(); + if(cx == null) cx = contextFactory.enterContext(); + final int size = selectorList.length; final Object[] args = new Object[size + 1]; @@ -237,29 +246,45 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory args[i + 1] = selectorList[i++].get(); } - Object res = fnAggregate.call(cx, scope, scope, args); + final Object res = fnAggregate.call(cx, scope, scope, args); return Context.toNumber(res); } @Override - public double combine(double a, double b) + public double combine(final double a, final double b) { - Object res = fnCombine.call(cx, scope, scope, new Object[]{a, b}); + final Object res = contextFactory.call( + new ContextAction() + { + @Override + public Object run(final Context cx) + { + return fnCombine.call(cx, scope, scope, new Object[]{a, b}); + } + } + ); return Context.toNumber(res); } @Override public double reset() { - Object res = fnReset.call(cx, scope, scope, new Object[]{}); + final Object res = contextFactory.call( + new ContextAction() + { + @Override + public Object run(final Context cx) + { + return fnReset.call(cx, scope, scope, new Object[]{}); + } + } + ); return Context.toNumber(res); } @Override - protected void finalize() throws Throwable - { - cx.exit(); - super.finalize(); + public void close() { + if(Context.getCurrentContext() != null) Context.exit(); } }; } diff --git a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptBufferAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptBufferAggregator.java index 02fe27abda4..88f97b6a5dd 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptBufferAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptBufferAggregator.java @@ -62,4 +62,9 @@ public class JavaScriptBufferAggregator implements BufferAggregator { return (float)buf.getDouble(position); } + + @Override + public void close() { + script.close(); + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/LongSumAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/LongSumAggregator.java index 32a81ef719f..9c7aadfa4f1 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/LongSumAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/LongSumAggregator.java @@ -89,4 +89,10 @@ public class LongSumAggregator implements Aggregator { return new LongSumAggregator(name, selector); } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/LongSumBufferAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/LongSumBufferAggregator.java index 533f6782d5d..747d383fcb7 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/LongSumBufferAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/LongSumBufferAggregator.java @@ -59,4 +59,10 @@ public class LongSumBufferAggregator implements BufferAggregator { return (float) buf.getLong(position); } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/MaxAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/MaxAggregator.java index 0bd0ed6f29a..8761c001da4 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/MaxAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/MaxAggregator.java @@ -82,4 +82,10 @@ public class MaxAggregator implements Aggregator { return new MaxAggregator(name, selector); } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/MaxBufferAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/MaxBufferAggregator.java index 8a497a1ddfb..4bfcd2f2bbf 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/MaxBufferAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/MaxBufferAggregator.java @@ -57,4 +57,10 @@ public class MaxBufferAggregator implements BufferAggregator { return (float) buf.getDouble(position); } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/MinAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/MinAggregator.java index 1ff56fa837d..a42ba758ed0 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/MinAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/MinAggregator.java @@ -82,4 +82,10 @@ public class MinAggregator implements Aggregator { return new MinAggregator(name, selector); } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/MinBufferAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/MinBufferAggregator.java index 68e34675e36..7bd29477b97 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/MinBufferAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/MinBufferAggregator.java @@ -57,4 +57,10 @@ public class MinBufferAggregator implements BufferAggregator { return (float) buf.getDouble(position); } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/NoopAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/NoopAggregator.java index 11e175d7b78..5f7ddb2e78b 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/NoopAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/NoopAggregator.java @@ -59,4 +59,10 @@ public class NoopAggregator implements Aggregator { return name; } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/NoopBufferAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/NoopBufferAggregator.java index c46eb6a8d16..a486fbcad2e 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/NoopBufferAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/NoopBufferAggregator.java @@ -46,4 +46,10 @@ public class NoopBufferAggregator implements BufferAggregator { return 0; } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/processing/MetricSelectorUtils.java b/common/src/main/java/com/metamx/druid/processing/MetricSelectorUtils.java new file mode 100644 index 00000000000..0db847b70d9 --- /dev/null +++ b/common/src/main/java/com/metamx/druid/processing/MetricSelectorUtils.java @@ -0,0 +1,5 @@ +package com.metamx.druid.processing; + +public class MetricSelectorUtils +{ +} diff --git a/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java b/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java index cb9a1468b80..7fb1c1648a2 100644 --- a/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java +++ b/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java @@ -300,7 +300,9 @@ public class GroupByQueryEngine @Override public boolean hasNext() { - return delegate.hasNext() || !cursor.isDone(); + boolean hasNext = delegate.hasNext() || !cursor.isDone(); + if(!hasNext) for(BufferAggregator agg : aggregators) agg.close(); + return hasNext; } @Override diff --git a/server/src/main/java/com/metamx/druid/query/timeseries/TimeseriesQueryEngine.java b/server/src/main/java/com/metamx/druid/query/timeseries/TimeseriesQueryEngine.java index 018dc329fc2..79ab780986f 100644 --- a/server/src/main/java/com/metamx/druid/query/timeseries/TimeseriesQueryEngine.java +++ b/server/src/main/java/com/metamx/druid/query/timeseries/TimeseriesQueryEngine.java @@ -80,7 +80,12 @@ public class TimeseriesQueryEngine bob.addMetric(postAgg); } - return bob.build(); + Result retVal = bob.build(); + + // cleanup + for(Aggregator agg : aggregators) agg.close(); + + return retVal; } } ).iterator();