From c68bd66945877f385904a14d8abd333ecee56fe2 Mon Sep 17 00:00:00 2001 From: xvrl Date: Mon, 18 Mar 2013 13:18:28 -0700 Subject: [PATCH 1/9] add close method to aggregators --- .../metamx/druid/aggregation/Aggregator.java | 3 +- .../druid/aggregation/BufferAggregator.java | 5 ++ .../druid/aggregation/CountAggregator.java | 6 ++ .../aggregation/CountBufferAggregator.java | 6 ++ .../aggregation/DoubleSumAggregator.java | 6 ++ .../DoubleSumBufferAggregator.java | 6 ++ .../aggregation/HistogramAggregator.java | 6 ++ .../HistogramBufferAggregator.java | 6 ++ .../aggregation/JavaScriptAggregator.java | 9 +++ .../JavaScriptAggregatorFactory.java | 65 +++++++++++++------ .../JavaScriptBufferAggregator.java | 5 ++ .../druid/aggregation/LongSumAggregator.java | 6 ++ .../aggregation/LongSumBufferAggregator.java | 6 ++ .../druid/aggregation/MaxAggregator.java | 6 ++ .../aggregation/MaxBufferAggregator.java | 6 ++ .../druid/aggregation/MinAggregator.java | 6 ++ .../aggregation/MinBufferAggregator.java | 6 ++ .../druid/aggregation/NoopAggregator.java | 6 ++ .../aggregation/NoopBufferAggregator.java | 6 ++ .../druid/processing/MetricSelectorUtils.java | 5 ++ .../druid/query/group/GroupByQueryEngine.java | 4 +- .../timeseries/TimeseriesQueryEngine.java | 7 +- 22 files changed, 163 insertions(+), 24 deletions(-) create mode 100644 common/src/main/java/com/metamx/druid/processing/MetricSelectorUtils.java diff --git a/common/src/main/java/com/metamx/druid/aggregation/Aggregator.java b/common/src/main/java/com/metamx/druid/aggregation/Aggregator.java index 4acbd162f5a..fd42549971e 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/Aggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/Aggregator.java @@ -19,8 +19,6 @@ package com.metamx.druid.aggregation; -import java.util.Comparator; - /** * An Aggregator is an object that can aggregate metrics. Its aggregation-related methods (namely, aggregate() and get()) * do not take any arguments as the assumption is that the Aggregator was given something in its constructor that @@ -40,4 +38,5 @@ public interface Aggregator { Object get(); float getFloat(); String getName(); + void close(); } diff --git a/common/src/main/java/com/metamx/druid/aggregation/BufferAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/BufferAggregator.java index bff68cd14a2..6a010f7a3db 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/BufferAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/BufferAggregator.java @@ -88,4 +88,9 @@ public interface BufferAggregator * @return the float representation of the aggregate */ float getFloat(ByteBuffer buf, int position); + + /** + * Release any resources used by the aggregator + */ + void close(); } diff --git a/common/src/main/java/com/metamx/druid/aggregation/CountAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/CountAggregator.java index ec656a25673..e806aa78e1c 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/CountAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/CountAggregator.java @@ -75,4 +75,10 @@ public class CountAggregator implements Aggregator { return new CountAggregator(name); } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/CountBufferAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/CountBufferAggregator.java index c2a6b5a485d..0e6f06c786f 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/CountBufferAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/CountBufferAggregator.java @@ -49,4 +49,10 @@ public class CountBufferAggregator implements BufferAggregator { return buf.getLong(position); } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/DoubleSumAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/DoubleSumAggregator.java index 21cb4691227..545c8c64feb 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/DoubleSumAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/DoubleSumAggregator.java @@ -89,4 +89,10 @@ public class DoubleSumAggregator implements Aggregator { return new DoubleSumAggregator(name, selector); } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/DoubleSumBufferAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/DoubleSumBufferAggregator.java index 3e85d489db2..62f4e75f553 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/DoubleSumBufferAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/DoubleSumBufferAggregator.java @@ -59,4 +59,10 @@ public class DoubleSumBufferAggregator implements BufferAggregator { return (float) buf.getDouble(position); } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/HistogramAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/HistogramAggregator.java index c2d9d674732..2355f458d29 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/HistogramAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/HistogramAggregator.java @@ -80,4 +80,10 @@ public class HistogramAggregator implements Aggregator { return name; } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/HistogramBufferAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/HistogramBufferAggregator.java index fe4f366631c..07c7c37dee0 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/HistogramBufferAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/HistogramBufferAggregator.java @@ -90,4 +90,10 @@ public class HistogramBufferAggregator implements BufferAggregator { throw new UnsupportedOperationException("HistogramBufferAggregator does not support getFloat()"); } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregator.java index 80daa045d82..d24277660bf 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregator.java @@ -21,6 +21,7 @@ package com.metamx.druid.aggregation; import com.google.common.collect.Lists; import com.metamx.druid.processing.FloatMetricSelector; +import org.mozilla.javascript.Context; import java.util.List; @@ -33,6 +34,8 @@ public class JavaScriptAggregator implements Aggregator public double combine(double a, double b); public double reset(); + + public void close(); } private final String name; @@ -79,4 +82,10 @@ public class JavaScriptAggregator implements Aggregator { return name; } + + @Override + public void close() + { + script.close(); + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java index 5c257f8cc74..d5132feae3d 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java +++ b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java @@ -30,6 +30,8 @@ import com.metamx.druid.processing.MetricSelectorFactory; import org.mozilla.javascript.Context; +import org.mozilla.javascript.ContextAction; +import org.mozilla.javascript.ContextFactory; import org.mozilla.javascript.Function; import org.mozilla.javascript.Script; import org.mozilla.javascript.ScriptableObject; @@ -49,7 +51,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory private final List fieldNames; private final String script; - private final JavaScriptAggregator.ScriptAggregator combiner; + private final JavaScriptAggregator.ScriptAggregator compiledScript; @JsonCreator public JavaScriptAggregatorFactory( @@ -61,7 +63,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory this.name = name; this.script = expression; this.fieldNames = fieldNames; - this.combiner = compileScript(script); + this.compiledScript = compileScript(script); } @Override @@ -77,7 +79,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory public FloatMetricSelector apply(@Nullable String s) { return metricFactory.makeFloatMetricSelector(s); } } ), - compileScript(script) + compiledScript ); } @@ -96,7 +98,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory } } ), - compileScript(script) + compiledScript ); } @@ -109,7 +111,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory @Override public Object combine(Object lhs, Object rhs) { - return combiner.combine(((Number) lhs).doubleValue(), ((Number) rhs).doubleValue()); + return compiledScript.combine(((Number) lhs).doubleValue(), ((Number) rhs).doubleValue()); } @Override @@ -187,7 +189,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory @Override public Object getAggregatorStartValue() { - return combiner.reset(); + return compiledScript.reset(); } @Override @@ -212,22 +214,29 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory public static JavaScriptAggregator.ScriptAggregator compileScript(final String script) { - final Context cx = Context.enter(); - cx.setOptimizationLevel(9); - final ScriptableObject scope = cx.initStandardObjects(); + final ContextFactory contextFactory = ContextFactory.getGlobal(); + Context context = contextFactory.enterContext(); + context.setOptimizationLevel(9); - Script compiledScript = cx.compileString(script, "script", 1, null); - compiledScript.exec(cx, scope); + final ScriptableObject scope = context.initStandardObjects(); + + Script compiledScript = context.compileString(script, "script", 1, null); + compiledScript.exec(context, scope); final Function fnAggregate = getScriptFunction("aggregate", scope); final Function fnReset = getScriptFunction("reset", scope); final Function fnCombine = getScriptFunction("combine", scope); + Context.exit(); + return new JavaScriptAggregator.ScriptAggregator() { @Override - public double aggregate(double current, FloatMetricSelector[] selectorList) + public double aggregate(final double current, final FloatMetricSelector[] selectorList) { + Context cx = Context.getCurrentContext(); + if(cx == null) cx = contextFactory.enterContext(); + final int size = selectorList.length; final Object[] args = new Object[size + 1]; @@ -237,29 +246,45 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory args[i + 1] = selectorList[i++].get(); } - Object res = fnAggregate.call(cx, scope, scope, args); + final Object res = fnAggregate.call(cx, scope, scope, args); return Context.toNumber(res); } @Override - public double combine(double a, double b) + public double combine(final double a, final double b) { - Object res = fnCombine.call(cx, scope, scope, new Object[]{a, b}); + final Object res = contextFactory.call( + new ContextAction() + { + @Override + public Object run(final Context cx) + { + return fnCombine.call(cx, scope, scope, new Object[]{a, b}); + } + } + ); return Context.toNumber(res); } @Override public double reset() { - Object res = fnReset.call(cx, scope, scope, new Object[]{}); + final Object res = contextFactory.call( + new ContextAction() + { + @Override + public Object run(final Context cx) + { + return fnReset.call(cx, scope, scope, new Object[]{}); + } + } + ); return Context.toNumber(res); } @Override - protected void finalize() throws Throwable - { - cx.exit(); - super.finalize(); + public void close() { + if(Context.getCurrentContext() != null) Context.exit(); } }; } diff --git a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptBufferAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptBufferAggregator.java index 02fe27abda4..88f97b6a5dd 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptBufferAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptBufferAggregator.java @@ -62,4 +62,9 @@ public class JavaScriptBufferAggregator implements BufferAggregator { return (float)buf.getDouble(position); } + + @Override + public void close() { + script.close(); + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/LongSumAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/LongSumAggregator.java index 32a81ef719f..9c7aadfa4f1 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/LongSumAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/LongSumAggregator.java @@ -89,4 +89,10 @@ public class LongSumAggregator implements Aggregator { return new LongSumAggregator(name, selector); } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/LongSumBufferAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/LongSumBufferAggregator.java index 533f6782d5d..747d383fcb7 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/LongSumBufferAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/LongSumBufferAggregator.java @@ -59,4 +59,10 @@ public class LongSumBufferAggregator implements BufferAggregator { return (float) buf.getLong(position); } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/MaxAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/MaxAggregator.java index 0bd0ed6f29a..8761c001da4 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/MaxAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/MaxAggregator.java @@ -82,4 +82,10 @@ public class MaxAggregator implements Aggregator { return new MaxAggregator(name, selector); } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/MaxBufferAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/MaxBufferAggregator.java index 8a497a1ddfb..4bfcd2f2bbf 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/MaxBufferAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/MaxBufferAggregator.java @@ -57,4 +57,10 @@ public class MaxBufferAggregator implements BufferAggregator { return (float) buf.getDouble(position); } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/MinAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/MinAggregator.java index 1ff56fa837d..a42ba758ed0 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/MinAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/MinAggregator.java @@ -82,4 +82,10 @@ public class MinAggregator implements Aggregator { return new MinAggregator(name, selector); } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/MinBufferAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/MinBufferAggregator.java index 68e34675e36..7bd29477b97 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/MinBufferAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/MinBufferAggregator.java @@ -57,4 +57,10 @@ public class MinBufferAggregator implements BufferAggregator { return (float) buf.getDouble(position); } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/NoopAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/NoopAggregator.java index 11e175d7b78..5f7ddb2e78b 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/NoopAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/NoopAggregator.java @@ -59,4 +59,10 @@ public class NoopAggregator implements Aggregator { return name; } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/NoopBufferAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/NoopBufferAggregator.java index c46eb6a8d16..a486fbcad2e 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/NoopBufferAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/NoopBufferAggregator.java @@ -46,4 +46,10 @@ public class NoopBufferAggregator implements BufferAggregator { return 0; } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/processing/MetricSelectorUtils.java b/common/src/main/java/com/metamx/druid/processing/MetricSelectorUtils.java new file mode 100644 index 00000000000..0db847b70d9 --- /dev/null +++ b/common/src/main/java/com/metamx/druid/processing/MetricSelectorUtils.java @@ -0,0 +1,5 @@ +package com.metamx.druid.processing; + +public class MetricSelectorUtils +{ +} diff --git a/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java b/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java index cb9a1468b80..7fb1c1648a2 100644 --- a/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java +++ b/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java @@ -300,7 +300,9 @@ public class GroupByQueryEngine @Override public boolean hasNext() { - return delegate.hasNext() || !cursor.isDone(); + boolean hasNext = delegate.hasNext() || !cursor.isDone(); + if(!hasNext) for(BufferAggregator agg : aggregators) agg.close(); + return hasNext; } @Override diff --git a/server/src/main/java/com/metamx/druid/query/timeseries/TimeseriesQueryEngine.java b/server/src/main/java/com/metamx/druid/query/timeseries/TimeseriesQueryEngine.java index 018dc329fc2..79ab780986f 100644 --- a/server/src/main/java/com/metamx/druid/query/timeseries/TimeseriesQueryEngine.java +++ b/server/src/main/java/com/metamx/druid/query/timeseries/TimeseriesQueryEngine.java @@ -80,7 +80,12 @@ public class TimeseriesQueryEngine bob.addMetric(postAgg); } - return bob.build(); + Result retVal = bob.build(); + + // cleanup + for(Aggregator agg : aggregators) agg.close(); + + return retVal; } } ).iterator(); From 8eec41f934187bf1db8d053ca1cf0f9506c1a22c Mon Sep 17 00:00:00 2001 From: xvrl Date: Wed, 20 Mar 2013 11:41:40 -0700 Subject: [PATCH 2/9] squigglies ftw --- .../druid/aggregation/JavaScriptAggregatorFactory.java | 4 +++- .../com/metamx/druid/query/group/GroupByQueryEngine.java | 5 ++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java index d5132feae3d..58f1e8d1e3d 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java +++ b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java @@ -284,7 +284,9 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory @Override public void close() { - if(Context.getCurrentContext() != null) Context.exit(); + if(Context.getCurrentContext() != null) { + Context.exit(); + } } }; } diff --git a/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java b/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java index 7fb1c1648a2..3c0ab475397 100644 --- a/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java +++ b/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java @@ -301,7 +301,10 @@ public class GroupByQueryEngine public boolean hasNext() { boolean hasNext = delegate.hasNext() || !cursor.isDone(); - if(!hasNext) for(BufferAggregator agg : aggregators) agg.close(); + if(!hasNext) { + // cleanup + for(BufferAggregator agg : aggregators) agg.close(); + } return hasNext; } From 0b04114c12aaec0397ba1b5b3324f1da49267540 Mon Sep 17 00:00:00 2001 From: xvrl Date: Wed, 20 Mar 2013 12:17:20 -0700 Subject: [PATCH 3/9] more squigglies --- .../java/com/metamx/druid/query/group/GroupByQueryEngine.java | 4 +++- .../metamx/druid/query/timeseries/TimeseriesQueryEngine.java | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java b/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java index 3c0ab475397..797956ac555 100644 --- a/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java +++ b/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java @@ -303,7 +303,9 @@ public class GroupByQueryEngine boolean hasNext = delegate.hasNext() || !cursor.isDone(); if(!hasNext) { // cleanup - for(BufferAggregator agg : aggregators) agg.close(); + for(BufferAggregator agg : aggregators) { + agg.close(); + } } return hasNext; } diff --git a/server/src/main/java/com/metamx/druid/query/timeseries/TimeseriesQueryEngine.java b/server/src/main/java/com/metamx/druid/query/timeseries/TimeseriesQueryEngine.java index 79ab780986f..24a3816d2de 100644 --- a/server/src/main/java/com/metamx/druid/query/timeseries/TimeseriesQueryEngine.java +++ b/server/src/main/java/com/metamx/druid/query/timeseries/TimeseriesQueryEngine.java @@ -83,7 +83,9 @@ public class TimeseriesQueryEngine Result retVal = bob.build(); // cleanup - for(Aggregator agg : aggregators) agg.close(); + for(Aggregator agg : aggregators) { + agg.close(); + } return retVal; } From 57f881072154b79968fa4141d549ec088731e2b8 Mon Sep 17 00:00:00 2001 From: xvrl Date: Wed, 20 Mar 2013 14:07:06 -0700 Subject: [PATCH 4/9] properly close rowiterators --- .../druid/query/group/GroupByQueryEngine.java | 84 +++++++++++++------ 1 file changed, 59 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java b/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java index 797956ac555..d3cce2a5777 100644 --- a/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java +++ b/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java @@ -31,6 +31,7 @@ import com.metamx.common.ISE; import com.metamx.common.guava.BaseSequence; import com.metamx.common.guava.FunctionalIterator; import com.metamx.common.guava.Sequence; +import com.metamx.common.parsers.CloseableIterator; import com.metamx.druid.StorageAdapter; import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.aggregation.BufferAggregator; @@ -47,6 +48,7 @@ import org.joda.time.DateTime; import org.joda.time.Interval; import javax.annotation.Nullable; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Iterator; import java.util.List; @@ -85,29 +87,61 @@ public class GroupByQueryEngine final ResourceHolder bufferHolder = intermediateResultsBufferPool.take(); - return new BaseSequence>( - new BaseSequence.IteratorMaker>() + return new BaseSequence>( + new BaseSequence.IteratorMaker>() { @Override - public Iterator make() + public CloseableIterator make() { - return FunctionalIterator - .create(cursors.iterator()) - .transformCat( - new Function>() - { - @Override - public Iterator apply(@Nullable final Cursor cursor) - { - return new RowIterator(query, cursor, bufferHolder.get()); - } - } - ); + return new CloseableIterator() + { + final List rowIterators = Lists.newLinkedList(); + final Iterator delegate = FunctionalIterator + .create(cursors.iterator()) + .transformCat( + new Function>() + { + @Override + public Iterator apply(@Nullable final Cursor cursor) + { + RowIterator it = new RowIterator(query, cursor, bufferHolder.get()); + rowIterators.add(it); + return it; + } + } + ); + @Override + public void close() + { + for(CloseableIterator it : rowIterators) { + Closeables.closeQuietly(it); + } + } + + @Override + public boolean hasNext() + { + return delegate.hasNext(); + } + + @Override + public Row next() + { + return delegate.next(); + } + + @Override + public void remove() + { + delegate.remove(); + } + }; } @Override - public void cleanup(Iterator iterFromMake) + public void cleanup(CloseableIterator iterFromMake) { + Closeables.closeQuietly(iterFromMake); Closeables.closeQuietly(bufferHolder); } } @@ -248,7 +282,7 @@ public class GroupByQueryEngine } } - private class RowIterator implements Iterator + private class RowIterator implements CloseableIterator { private final GroupByQuery query; private final Cursor cursor; @@ -300,14 +334,7 @@ public class GroupByQueryEngine @Override public boolean hasNext() { - boolean hasNext = delegate.hasNext() || !cursor.isDone(); - if(!hasNext) { - // cleanup - for(BufferAggregator agg : aggregators) { - agg.close(); - } - } - return hasNext; + return delegate.hasNext() || !cursor.isDone(); } @Override @@ -391,5 +418,12 @@ public class GroupByQueryEngine { throw new UnsupportedOperationException(); } + + public void close() { + // cleanup + for(BufferAggregator agg : aggregators) { + agg.close(); + } + } } } From 3687cba18eeb9d0098b12166a90dddf18c2a215c Mon Sep 17 00:00:00 2001 From: xvrl Date: Fri, 22 Mar 2013 17:42:10 -0700 Subject: [PATCH 5/9] close resources as we iterate over them --- .../druid/query/group/GroupByQueryEngine.java | 79 ++++++++----------- 1 file changed, 31 insertions(+), 48 deletions(-) diff --git a/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java b/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java index d3cce2a5777..a944bfef894 100644 --- a/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java +++ b/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java @@ -31,6 +31,7 @@ import com.metamx.common.ISE; import com.metamx.common.guava.BaseSequence; import com.metamx.common.guava.FunctionalIterator; import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; import com.metamx.common.parsers.CloseableIterator; import com.metamx.druid.StorageAdapter; import com.metamx.druid.aggregation.AggregatorFactory; @@ -87,65 +88,47 @@ public class GroupByQueryEngine final ResourceHolder bufferHolder = intermediateResultsBufferPool.take(); - return new BaseSequence>( - new BaseSequence.IteratorMaker>() + return Sequences.concat( + new BaseSequence, Iterator>>(new BaseSequence.IteratorMaker, Iterator>>() { @Override - public CloseableIterator make() + public Iterator> make() { - return new CloseableIterator() - { - final List rowIterators = Lists.newLinkedList(); - final Iterator delegate = FunctionalIterator - .create(cursors.iterator()) - .transformCat( - new Function>() - { - @Override - public Iterator apply(@Nullable final Cursor cursor) - { - RowIterator it = new RowIterator(query, cursor, bufferHolder.get()); - rowIterators.add(it); - return it; - } - } - ); - @Override - public void close() - { - for(CloseableIterator it : rowIterators) { - Closeables.closeQuietly(it); - } - } + return FunctionalIterator + .create(cursors.iterator()) + .transform(new Function>() + { + @Override + public Sequence apply(@Nullable final Cursor cursor) + { + return new BaseSequence>( + new BaseSequence.IteratorMaker>() + { + @Override + public CloseableIterator make() + { + return new RowIterator(query, cursor, bufferHolder.get()); + } - @Override - public boolean hasNext() - { - return delegate.hasNext(); - } - - @Override - public Row next() - { - return delegate.next(); - } - - @Override - public void remove() - { - delegate.remove(); - } - }; + @Override + public void cleanup(CloseableIterator iterFromMake) + { + Closeables.closeQuietly(iterFromMake); + } + } + ); + } + }); } @Override - public void cleanup(CloseableIterator iterFromMake) + public void cleanup(Iterator> iterFromMake) { - Closeables.closeQuietly(iterFromMake); Closeables.closeQuietly(bufferHolder); } - } + }) ); + } private static class RowUpdater From c420fe3b56ffd1fc4e308d66c93ff1d918eccf33 Mon Sep 17 00:00:00 2001 From: xvrl Date: Sat, 23 Mar 2013 23:42:43 -0700 Subject: [PATCH 6/9] fix output timestamps in groupby queries with granularity "all". --- .../group/GroupByQueryQueryToolChest.java | 18 +++++++++++------- .../query/group/GroupByQueryRunnerTest.java | 18 ++++++++++++++++++ 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/client/src/main/java/com/metamx/druid/query/group/GroupByQueryQueryToolChest.java b/client/src/main/java/com/metamx/druid/query/group/GroupByQueryQueryToolChest.java index 51570e30955..a2a68ecc1cf 100644 --- a/client/src/main/java/com/metamx/druid/query/group/GroupByQueryQueryToolChest.java +++ b/client/src/main/java/com/metamx/druid/query/group/GroupByQueryQueryToolChest.java @@ -37,7 +37,6 @@ import com.metamx.druid.initialization.Initialization; import com.metamx.druid.input.MapBasedRow; import com.metamx.druid.input.Row; import com.metamx.druid.input.Rows; -import com.metamx.druid.query.CacheStrategy; import com.metamx.druid.query.MetricManipulationFn; import com.metamx.druid.query.QueryRunner; import com.metamx.druid.query.QueryToolChest; @@ -78,7 +77,13 @@ public class GroupByQueryQueryToolChest extends QueryToolChest condensed = query.getIntervals(); + final QueryGranularity gran = query.getGranularity(); + final long timeStart = query.getIntervals().get(0).getStartMillis(); + + // use gran.iterable instead of gran.truncate so that + // AllGranularity returns timeStart instead of Long.MIN_VALUE + final long granTimeStart = gran.iterable(timeStart, timeStart+1).iterator().next(); + final List aggs = Lists.transform( query.getAggregatorSpecs(), new Function() @@ -102,10 +107,11 @@ public class GroupByQueryQueryToolChest extends QueryToolChest() { - private final QueryGranularity granularity = query.getGranularity(); - @Override public Row apply(Row input) { final MapBasedRow row = (MapBasedRow) input; - return new MapBasedRow(granularity.toDateTime(row.getTimestampFromEpoch()), row.getEvent()); + return new MapBasedRow(gran.toDateTime(row.getTimestampFromEpoch()), row.getEvent()); } } ); diff --git a/server/src/test/java/com/metamx/druid/query/group/GroupByQueryRunnerTest.java b/server/src/test/java/com/metamx/druid/query/group/GroupByQueryRunnerTest.java index 6e86c9e1022..5dd95c9c4e0 100644 --- a/server/src/test/java/com/metamx/druid/query/group/GroupByQueryRunnerTest.java +++ b/server/src/test/java/com/metamx/druid/query/group/GroupByQueryRunnerTest.java @@ -30,6 +30,7 @@ import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.druid.PeriodGranularity; import com.metamx.druid.Query; +import com.metamx.druid.QueryGranularity; import com.metamx.druid.TestHelper; import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.aggregation.LongSumAggregatorFactory; @@ -232,6 +233,7 @@ public class GroupByQueryRunnerTest .setGranularity(new PeriodGranularity(new Period("P1M"), null, null)); final GroupByQuery fullQuery = builder.build(); + final GroupByQuery allGranQuery = builder.copy().setGranularity(QueryGranularity.ALL).build(); QueryRunner mergedRunner = new GroupByQueryQueryToolChest().mergeResults( new QueryRunner() @@ -265,6 +267,22 @@ public class GroupByQueryRunnerTest TestHelper.assertExpectedObjects(expectedResults, runner.run(fullQuery), "direct"); TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(fullQuery), "merged"); + + List allGranExpectedResults = Arrays.asList( + createExpectedRow("2011-04-02", "alias", "automotive", "rows", 2L, "idx", 269L), + createExpectedRow("2011-04-02", "alias", "business", "rows", 2L, "idx", 217L), + createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 2L, "idx", 319L), + createExpectedRow("2011-04-02", "alias", "health", "rows", 2L, "idx", 216L), + createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 6L, "idx", 4420L), + createExpectedRow("2011-04-02", "alias", "news", "rows", 2L, "idx", 221L), + createExpectedRow("2011-04-02", "alias", "premium", "rows", 6L, "idx", 4416L), + createExpectedRow("2011-04-02", "alias", "technology", "rows", 2L, "idx", 177L), + createExpectedRow("2011-04-02", "alias", "travel", "rows", 2L, "idx", 243L) + ); + + TestHelper.assertExpectedObjects(allGranExpectedResults, runner.run(allGranQuery), "direct"); + TestHelper.assertExpectedObjects(allGranExpectedResults, mergedRunner.run(allGranQuery), "merged"); + } private MapBasedRow createExpectedRow(final String timestamp, Object... vals) From dcd10e69c720d7281a1a92ce0235e177e843eac8 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Sun, 24 Mar 2013 18:19:32 -0700 Subject: [PATCH 7/9] WorkerTaskMonitor: Fix isTaskRunning check --- .../java/com/metamx/druid/merger/worker/WorkerTaskMonitor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/WorkerTaskMonitor.java b/merger/src/main/java/com/metamx/druid/merger/worker/WorkerTaskMonitor.java index 400abec76fe..02e95a0ef39 100644 --- a/merger/src/main/java/com/metamx/druid/merger/worker/WorkerTaskMonitor.java +++ b/merger/src/main/java/com/metamx/druid/merger/worker/WorkerTaskMonitor.java @@ -175,7 +175,7 @@ public class WorkerTaskMonitor implements QuerySegmentWalker private boolean isTaskRunning(final Task task) { for (final Task runningTask : running) { - if (runningTask.equals(task.getId())) { + if (runningTask.getId().equals(task.getId())) { return true; } } From a28de5fa889466aba4fb8568ee9f48ab7c9430e9 Mon Sep 17 00:00:00 2001 From: Nelson Ray Date: Mon, 25 Mar 2013 10:32:02 -0700 Subject: [PATCH 8/9] fix per tier metrics emission for new balancer --- .../druid/master/DruidMasterLogger.java | 90 +++++++++++++------ 1 file changed, 65 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java b/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java index 38068600637..ac6e7b328a1 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java @@ -65,35 +65,75 @@ public class DruidMasterLogger implements DruidMasterHelper } } - emitter.emit( - new ServiceMetricEvent.Builder().build( - "master/cost/raw", stats.getGlobalStats().get("initialCost") - ) - ); + Map initialCosts = stats.getPerTierStats().get("initialCost"); + if (initialCosts != null) { + for (Map.Entry entry : initialCosts.entrySet()) { + String tier = entry.getKey(); + AtomicLong value = entry.getValue(); + emitter.emit( + new ServiceMetricEvent.Builder() + .build( + String.format("master/%s/cost/raw", tier), value.get() + ) + ); + } + } - emitter.emit( - new ServiceMetricEvent.Builder().build( - "master/cost/normalization", stats.getGlobalStats().get("normalization") - ) - ); + Map normalization = stats.getPerTierStats().get("normalization"); + if (initialCosts != null) { + for (Map.Entry entry : normalization.entrySet()) { + String tier = entry.getKey(); + AtomicLong value = entry.getValue(); + emitter.emit( + new ServiceMetricEvent.Builder() + .build( + String.format("master/%s/cost/normalization", tier), value.get() + ) + ); + } + } - emitter.emit( - new ServiceMetricEvent.Builder().build( - "master/cost/normalized", stats.getGlobalStats().get("normalizedInitialCostTimesOneThousand").doubleValue() / 1000d - ) - ); + Map normalized = stats.getPerTierStats().get("normalizedInitialCostTimesOneThousand"); + if (initialCosts != null) { + for (Map.Entry entry : normalized.entrySet()) { + String tier = entry.getKey(); + AtomicLong value = entry.getValue(); + emitter.emit( + new ServiceMetricEvent.Builder() + .build( + String.format("master/%s/cost/normalized", tier), (double) value.get() / 1000d + ) + ); + } + } - emitter.emit( - new ServiceMetricEvent.Builder().build( - "master/moved/count", stats.getGlobalStats().get("movedCount") - ) - ); + Map movedCount = stats.getPerTierStats().get("movedCount"); + if (initialCosts != null) { + for (Map.Entry entry : movedCount.entrySet()) { + String tier = entry.getKey(); + AtomicLong value = entry.getValue(); + emitter.emit( + new ServiceMetricEvent.Builder() + .build( + String.format("master/%s/moved/count", tier), value.get() + ) + ); + } + } - emitter.emit( - new ServiceMetricEvent.Builder().build( - "master/deleted/count", stats.getGlobalStats().get("deletedCount") - ) - ); + Map deletedCount = stats.getPerTierStats().get("deletedCount"); + if (initialCosts != null) { + for (Map.Entry entry : deletedCount.entrySet()) { + String tier = entry.getKey(); + AtomicLong value = entry.getValue(); + emitter.emit( + new ServiceMetricEvent.Builder() + .build( + String.format("master/%s/deleted/count", tier), value.get() + ) + ); + } + } Map unneeded = stats.getPerTierStats().get("unneededCount"); if (unneeded != null) { From e57249bf158deacd2293a821b40566ef1eb46fef Mon Sep 17 00:00:00 2001 From: Nelson Ray Date: Mon, 25 Mar 2013 11:49:46 -0700 Subject: [PATCH 9/9] refactor balancer metric emission code --- .../druid/master/DruidMasterLogger.java | 102 +++++++----------- 1 file changed, 37 insertions(+), 65 deletions(-) diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java b/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java index ac6e7b328a1..1328619db44 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java @@ -19,6 +19,7 @@ package com.metamx.druid.master; +import com.google.common.collect.Maps; import com.google.common.collect.MinMaxPriorityQueue; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; @@ -28,6 +29,7 @@ import com.metamx.druid.collect.CountingMap; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; +import javax.annotation.Nullable; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -38,6 +40,21 @@ public class DruidMasterLogger implements DruidMasterHelper { private static final Logger log = new Logger(DruidMasterLogger.class); + private void emitTieredStats(final ServiceEmitter emitter, final String formatString, final Map statMap) + { + if (statMap != null) { + for (Map.Entry entry : statMap.entrySet()) { + String tier = entry.getKey(); + Number value = entry.getValue(); + emitter.emit( + new ServiceMetricEvent.Builder().build( + String.format(formatString, tier), value.doubleValue() + ) + ); + } + } + } + @Override public DruidMasterRuntimeParams run(DruidMasterRuntimeParams params) { @@ -65,75 +82,30 @@ public class DruidMasterLogger implements DruidMasterHelper } } - Map initialCosts = stats.getPerTierStats().get("initialCost"); - if (initialCosts != null) { - for (Map.Entry entry : initialCosts.entrySet()) { - String tier = entry.getKey(); - AtomicLong value = entry.getValue(); - emitter.emit( - new ServiceMetricEvent.Builder() - .build( - String.format("master/%s/cost/raw", tier), value.get() - ) - ); - } - } + emitTieredStats(emitter, "master/%s/cost/raw", + stats.getPerTierStats().get("initialCost")); - Map normalization = stats.getPerTierStats().get("normalization"); - if (initialCosts != null) { - for (Map.Entry entry : normalization.entrySet()) { - String tier = entry.getKey(); - AtomicLong value = entry.getValue(); - emitter.emit( - new ServiceMetricEvent.Builder() - .build( - String.format("master/%s/cost/normalization", tier), value.get() - ) - ); - } - } + emitTieredStats(emitter, "master/%s/cost/normalization", + stats.getPerTierStats().get("normalization")); - Map normalized = stats.getPerTierStats().get("normalizedInitialCostTimesOneThousand"); - if (initialCosts != null) { - for (Map.Entry entry : normalized.entrySet()) { - String tier = entry.getKey(); - AtomicLong value = entry.getValue(); - emitter.emit( - new ServiceMetricEvent.Builder() - .build( - String.format("master/%s/cost/normalized", tier), (double) value.get() / 1000d - ) - ); - } - } + emitTieredStats(emitter, "master/%s/moved/count", + stats.getPerTierStats().get("movedCount")); - Map movedCount = stats.getPerTierStats().get("movedCount"); - if (initialCosts != null) { - for (Map.Entry entry : movedCount.entrySet()) { - String tier = entry.getKey(); - AtomicLong value = entry.getValue(); - emitter.emit( - new ServiceMetricEvent.Builder() - .build( - String.format("master/%s/moved/count", tier), value.get() - ) - ); - } - } + emitTieredStats(emitter, "master/%s/deleted/count", + stats.getPerTierStats().get("deletedCount")); - Map deletedCount = stats.getPerTierStats().get("deletedCount"); - if (initialCosts != null) { - for (Map.Entry entry : deletedCount.entrySet()) { - String tier = entry.getKey(); - AtomicLong value = entry.getValue(); - emitter.emit( - new ServiceMetricEvent.Builder() - .build( - String.format("master/%s/deleted/count", tier), value.get() - ) - ); - } - } + emitTieredStats(emitter, "master/%s/cost/normalized", + Maps.transformEntries(stats.getPerTierStats().get("normalizedInitialCostTimesOneThousand"), + new Maps.EntryTransformer() + { + @Override + public Number transformEntry( + @Nullable String key, @Nullable AtomicLong value + ) + { + return value.doubleValue() / 1000d; + } + })); Map unneeded = stats.getPerTierStats().get("unneededCount"); if (unneeded != null) {