diff --git a/client/pom.xml b/client/pom.xml index 511531ac6be..5b44a907c1c 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.27-SNAPSHOT + 0.3.28-SNAPSHOT diff --git a/client/src/main/java/com/metamx/druid/client/DruidServer.java b/client/src/main/java/com/metamx/druid/client/DruidServer.java index a71d8a15ac7..ff565e04fce 100644 --- a/client/src/main/java/com/metamx/druid/client/DruidServer.java +++ b/client/src/main/java/com/metamx/druid/client/DruidServer.java @@ -24,8 +24,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableMap; import com.metamx.common.logger.Logger; - - import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -41,7 +39,7 @@ public class DruidServer implements Comparable private final String name; private final ConcurrentMap dataSources; - private final Map segments; + private final ConcurrentMap segments; private final String host; private final long maxSize; @@ -80,7 +78,7 @@ public class DruidServer implements Comparable this.tier = tier; this.dataSources = new ConcurrentHashMap(); - this.segments = new HashMap(); + this.segments = new ConcurrentHashMap(); } public String getName() @@ -132,7 +130,7 @@ public class DruidServer implements Comparable @JsonProperty public Map getSegments() { - return segments; + return ImmutableMap.copyOf(segments); } public DataSegment getSegment(String segmentName) diff --git a/client/src/main/java/com/metamx/druid/query/ChainedExecutionQueryRunner.java b/client/src/main/java/com/metamx/druid/query/ChainedExecutionQueryRunner.java index 6a30a743892..52fc99a35b1 100644 --- a/client/src/main/java/com/metamx/druid/query/ChainedExecutionQueryRunner.java +++ b/client/src/main/java/com/metamx/druid/query/ChainedExecutionQueryRunner.java @@ -20,6 +20,7 @@ package com.metamx.druid.query; import com.google.common.base.Function; +import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; @@ -27,6 +28,7 @@ import com.metamx.common.guava.BaseSequence; import com.metamx.common.guava.MergeIterable; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; +import com.metamx.common.logger.Logger; import com.metamx.druid.Query; import java.util.Arrays; @@ -52,6 +54,8 @@ import java.util.concurrent.Future; */ public class ChainedExecutionQueryRunner implements QueryRunner { + private static final Logger log = new Logger(ChainedExecutionQueryRunner.class); + private final Iterable> queryables; private final ExecutorService exec; private final Ordering ordering; @@ -100,7 +104,13 @@ public class ChainedExecutionQueryRunner implements QueryRunner @Override public List call() throws Exception { - return Sequences.toList(input.run(query), Lists.newArrayList()); + try { + return Sequences.toList(input.run(query), Lists.newArrayList()); + } + catch (Exception e) { + log.error(e, "Exception with one of the sequences!"); + throw Throwables.propagate(e); + } } } ); diff --git a/client/src/main/java/com/metamx/druid/query/group/GroupByQueryQueryToolChest.java b/client/src/main/java/com/metamx/druid/query/group/GroupByQueryQueryToolChest.java index 51570e30955..a2a68ecc1cf 100644 --- a/client/src/main/java/com/metamx/druid/query/group/GroupByQueryQueryToolChest.java +++ b/client/src/main/java/com/metamx/druid/query/group/GroupByQueryQueryToolChest.java @@ -37,7 +37,6 @@ import com.metamx.druid.initialization.Initialization; import com.metamx.druid.input.MapBasedRow; import com.metamx.druid.input.Row; import com.metamx.druid.input.Rows; -import com.metamx.druid.query.CacheStrategy; import com.metamx.druid.query.MetricManipulationFn; import com.metamx.druid.query.QueryRunner; import com.metamx.druid.query.QueryToolChest; @@ -78,7 +77,13 @@ public class GroupByQueryQueryToolChest extends QueryToolChest condensed = query.getIntervals(); + final QueryGranularity gran = query.getGranularity(); + final long timeStart = query.getIntervals().get(0).getStartMillis(); + + // use gran.iterable instead of gran.truncate so that + // AllGranularity returns timeStart instead of Long.MIN_VALUE + final long granTimeStart = gran.iterable(timeStart, timeStart+1).iterator().next(); + final List aggs = Lists.transform( query.getAggregatorSpecs(), new Function() @@ -102,10 +107,11 @@ public class GroupByQueryQueryToolChest extends QueryToolChest() { - private final QueryGranularity granularity = query.getGranularity(); - @Override public Row apply(Row input) { final MapBasedRow row = (MapBasedRow) input; - return new MapBasedRow(granularity.toDateTime(row.getTimestampFromEpoch()), row.getEvent()); + return new MapBasedRow(gran.toDateTime(row.getTimestampFromEpoch()), row.getEvent()); } } ); diff --git a/common/pom.xml b/common/pom.xml index 8d4868eeca7..67857c543f6 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.27-SNAPSHOT + 0.3.28-SNAPSHOT diff --git a/common/src/main/java/com/metamx/druid/aggregation/Aggregator.java b/common/src/main/java/com/metamx/druid/aggregation/Aggregator.java index 4acbd162f5a..fd42549971e 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/Aggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/Aggregator.java @@ -19,8 +19,6 @@ package com.metamx.druid.aggregation; -import java.util.Comparator; - /** * An Aggregator is an object that can aggregate metrics. Its aggregation-related methods (namely, aggregate() and get()) * do not take any arguments as the assumption is that the Aggregator was given something in its constructor that @@ -40,4 +38,5 @@ public interface Aggregator { Object get(); float getFloat(); String getName(); + void close(); } diff --git a/common/src/main/java/com/metamx/druid/aggregation/BufferAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/BufferAggregator.java index bff68cd14a2..6a010f7a3db 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/BufferAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/BufferAggregator.java @@ -88,4 +88,9 @@ public interface BufferAggregator * @return the float representation of the aggregate */ float getFloat(ByteBuffer buf, int position); + + /** + * Release any resources used by the aggregator + */ + void close(); } diff --git a/common/src/main/java/com/metamx/druid/aggregation/CountAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/CountAggregator.java index ec656a25673..e806aa78e1c 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/CountAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/CountAggregator.java @@ -75,4 +75,10 @@ public class CountAggregator implements Aggregator { return new CountAggregator(name); } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/CountBufferAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/CountBufferAggregator.java index c2a6b5a485d..0e6f06c786f 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/CountBufferAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/CountBufferAggregator.java @@ -49,4 +49,10 @@ public class CountBufferAggregator implements BufferAggregator { return buf.getLong(position); } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/DoubleSumAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/DoubleSumAggregator.java index 21cb4691227..545c8c64feb 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/DoubleSumAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/DoubleSumAggregator.java @@ -89,4 +89,10 @@ public class DoubleSumAggregator implements Aggregator { return new DoubleSumAggregator(name, selector); } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/DoubleSumBufferAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/DoubleSumBufferAggregator.java index 3e85d489db2..62f4e75f553 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/DoubleSumBufferAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/DoubleSumBufferAggregator.java @@ -59,4 +59,10 @@ public class DoubleSumBufferAggregator implements BufferAggregator { return (float) buf.getDouble(position); } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/HistogramAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/HistogramAggregator.java index c2d9d674732..2355f458d29 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/HistogramAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/HistogramAggregator.java @@ -80,4 +80,10 @@ public class HistogramAggregator implements Aggregator { return name; } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/HistogramBufferAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/HistogramBufferAggregator.java index fe4f366631c..07c7c37dee0 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/HistogramBufferAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/HistogramBufferAggregator.java @@ -90,4 +90,10 @@ public class HistogramBufferAggregator implements BufferAggregator { throw new UnsupportedOperationException("HistogramBufferAggregator does not support getFloat()"); } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregator.java index 80daa045d82..d24277660bf 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregator.java @@ -21,6 +21,7 @@ package com.metamx.druid.aggregation; import com.google.common.collect.Lists; import com.metamx.druid.processing.FloatMetricSelector; +import org.mozilla.javascript.Context; import java.util.List; @@ -33,6 +34,8 @@ public class JavaScriptAggregator implements Aggregator public double combine(double a, double b); public double reset(); + + public void close(); } private final String name; @@ -79,4 +82,10 @@ public class JavaScriptAggregator implements Aggregator { return name; } + + @Override + public void close() + { + script.close(); + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java index 5c257f8cc74..58f1e8d1e3d 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java +++ b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java @@ -30,6 +30,8 @@ import com.metamx.druid.processing.MetricSelectorFactory; import org.mozilla.javascript.Context; +import org.mozilla.javascript.ContextAction; +import org.mozilla.javascript.ContextFactory; import org.mozilla.javascript.Function; import org.mozilla.javascript.Script; import org.mozilla.javascript.ScriptableObject; @@ -49,7 +51,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory private final List fieldNames; private final String script; - private final JavaScriptAggregator.ScriptAggregator combiner; + private final JavaScriptAggregator.ScriptAggregator compiledScript; @JsonCreator public JavaScriptAggregatorFactory( @@ -61,7 +63,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory this.name = name; this.script = expression; this.fieldNames = fieldNames; - this.combiner = compileScript(script); + this.compiledScript = compileScript(script); } @Override @@ -77,7 +79,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory public FloatMetricSelector apply(@Nullable String s) { return metricFactory.makeFloatMetricSelector(s); } } ), - compileScript(script) + compiledScript ); } @@ -96,7 +98,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory } } ), - compileScript(script) + compiledScript ); } @@ -109,7 +111,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory @Override public Object combine(Object lhs, Object rhs) { - return combiner.combine(((Number) lhs).doubleValue(), ((Number) rhs).doubleValue()); + return compiledScript.combine(((Number) lhs).doubleValue(), ((Number) rhs).doubleValue()); } @Override @@ -187,7 +189,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory @Override public Object getAggregatorStartValue() { - return combiner.reset(); + return compiledScript.reset(); } @Override @@ -212,22 +214,29 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory public static JavaScriptAggregator.ScriptAggregator compileScript(final String script) { - final Context cx = Context.enter(); - cx.setOptimizationLevel(9); - final ScriptableObject scope = cx.initStandardObjects(); + final ContextFactory contextFactory = ContextFactory.getGlobal(); + Context context = contextFactory.enterContext(); + context.setOptimizationLevel(9); - Script compiledScript = cx.compileString(script, "script", 1, null); - compiledScript.exec(cx, scope); + final ScriptableObject scope = context.initStandardObjects(); + + Script compiledScript = context.compileString(script, "script", 1, null); + compiledScript.exec(context, scope); final Function fnAggregate = getScriptFunction("aggregate", scope); final Function fnReset = getScriptFunction("reset", scope); final Function fnCombine = getScriptFunction("combine", scope); + Context.exit(); + return new JavaScriptAggregator.ScriptAggregator() { @Override - public double aggregate(double current, FloatMetricSelector[] selectorList) + public double aggregate(final double current, final FloatMetricSelector[] selectorList) { + Context cx = Context.getCurrentContext(); + if(cx == null) cx = contextFactory.enterContext(); + final int size = selectorList.length; final Object[] args = new Object[size + 1]; @@ -237,29 +246,47 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory args[i + 1] = selectorList[i++].get(); } - Object res = fnAggregate.call(cx, scope, scope, args); + final Object res = fnAggregate.call(cx, scope, scope, args); return Context.toNumber(res); } @Override - public double combine(double a, double b) + public double combine(final double a, final double b) { - Object res = fnCombine.call(cx, scope, scope, new Object[]{a, b}); + final Object res = contextFactory.call( + new ContextAction() + { + @Override + public Object run(final Context cx) + { + return fnCombine.call(cx, scope, scope, new Object[]{a, b}); + } + } + ); return Context.toNumber(res); } @Override public double reset() { - Object res = fnReset.call(cx, scope, scope, new Object[]{}); + final Object res = contextFactory.call( + new ContextAction() + { + @Override + public Object run(final Context cx) + { + return fnReset.call(cx, scope, scope, new Object[]{}); + } + } + ); return Context.toNumber(res); } @Override - protected void finalize() throws Throwable - { - cx.exit(); - super.finalize(); + public void close() { + if(Context.getCurrentContext() != null) { + Context.exit(); + } } }; } diff --git a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptBufferAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptBufferAggregator.java index 02fe27abda4..88f97b6a5dd 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptBufferAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptBufferAggregator.java @@ -62,4 +62,9 @@ public class JavaScriptBufferAggregator implements BufferAggregator { return (float)buf.getDouble(position); } + + @Override + public void close() { + script.close(); + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/LongSumAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/LongSumAggregator.java index 32a81ef719f..9c7aadfa4f1 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/LongSumAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/LongSumAggregator.java @@ -89,4 +89,10 @@ public class LongSumAggregator implements Aggregator { return new LongSumAggregator(name, selector); } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/LongSumBufferAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/LongSumBufferAggregator.java index 533f6782d5d..747d383fcb7 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/LongSumBufferAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/LongSumBufferAggregator.java @@ -59,4 +59,10 @@ public class LongSumBufferAggregator implements BufferAggregator { return (float) buf.getLong(position); } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/MaxAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/MaxAggregator.java index 0bd0ed6f29a..8761c001da4 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/MaxAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/MaxAggregator.java @@ -82,4 +82,10 @@ public class MaxAggregator implements Aggregator { return new MaxAggregator(name, selector); } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/MaxBufferAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/MaxBufferAggregator.java index 8a497a1ddfb..4bfcd2f2bbf 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/MaxBufferAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/MaxBufferAggregator.java @@ -57,4 +57,10 @@ public class MaxBufferAggregator implements BufferAggregator { return (float) buf.getDouble(position); } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/MinAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/MinAggregator.java index 1ff56fa837d..a42ba758ed0 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/MinAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/MinAggregator.java @@ -82,4 +82,10 @@ public class MinAggregator implements Aggregator { return new MinAggregator(name, selector); } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/MinBufferAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/MinBufferAggregator.java index 68e34675e36..7bd29477b97 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/MinBufferAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/MinBufferAggregator.java @@ -57,4 +57,10 @@ public class MinBufferAggregator implements BufferAggregator { return (float) buf.getDouble(position); } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/NoopAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/NoopAggregator.java index 11e175d7b78..5f7ddb2e78b 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/NoopAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/NoopAggregator.java @@ -59,4 +59,10 @@ public class NoopAggregator implements Aggregator { return name; } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/NoopBufferAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/NoopBufferAggregator.java index c46eb6a8d16..a486fbcad2e 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/NoopBufferAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/NoopBufferAggregator.java @@ -46,4 +46,10 @@ public class NoopBufferAggregator implements BufferAggregator { return 0; } + + @Override + public void close() + { + // no resources to cleanup + } } diff --git a/common/src/main/java/com/metamx/druid/processing/MetricSelectorUtils.java b/common/src/main/java/com/metamx/druid/processing/MetricSelectorUtils.java new file mode 100644 index 00000000000..0db847b70d9 --- /dev/null +++ b/common/src/main/java/com/metamx/druid/processing/MetricSelectorUtils.java @@ -0,0 +1,5 @@ +package com.metamx.druid.processing; + +public class MetricSelectorUtils +{ +} diff --git a/druid-services/pom.xml b/druid-services/pom.xml index 9cdfd73c66b..9a90a88481d 100644 --- a/druid-services/pom.xml +++ b/druid-services/pom.xml @@ -24,11 +24,11 @@ druid-services druid-services druid-services - 0.3.27-SNAPSHOT + 0.3.28-SNAPSHOT com.metamx druid - 0.3.27-SNAPSHOT + 0.3.28-SNAPSHOT diff --git a/examples/pom.xml b/examples/pom.xml index 37d52dd4310..106c6e04c30 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.27-SNAPSHOT + 0.3.28-SNAPSHOT diff --git a/examples/rand/pom.xml b/examples/rand/pom.xml index d2c09b3238c..33537a83c28 100644 --- a/examples/rand/pom.xml +++ b/examples/rand/pom.xml @@ -9,7 +9,7 @@ com.metamx druid-examples - 0.3.27-SNAPSHOT + 0.3.28-SNAPSHOT diff --git a/examples/twitter/pom.xml b/examples/twitter/pom.xml index 666cc6b4e39..674309f7a9c 100644 --- a/examples/twitter/pom.xml +++ b/examples/twitter/pom.xml @@ -9,7 +9,7 @@ com.metamx druid-examples - 0.3.27-SNAPSHOT + 0.3.28-SNAPSHOT diff --git a/index-common/pom.xml b/index-common/pom.xml index bea2c9305b6..42d4a29b0c4 100644 --- a/index-common/pom.xml +++ b/index-common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.27-SNAPSHOT + 0.3.28-SNAPSHOT diff --git a/indexer/pom.xml b/indexer/pom.xml index a22c0f8a175..db6a1b2e3a6 100644 --- a/indexer/pom.xml +++ b/indexer/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.27-SNAPSHOT + 0.3.28-SNAPSHOT diff --git a/merger/pom.xml b/merger/pom.xml index ec6f64e76dd..48a4ffab96b 100644 --- a/merger/pom.xml +++ b/merger/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.27-SNAPSHOT + 0.3.28-SNAPSHOT diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/WorkerTaskMonitor.java b/merger/src/main/java/com/metamx/druid/merger/worker/WorkerTaskMonitor.java index 400abec76fe..02e95a0ef39 100644 --- a/merger/src/main/java/com/metamx/druid/merger/worker/WorkerTaskMonitor.java +++ b/merger/src/main/java/com/metamx/druid/merger/worker/WorkerTaskMonitor.java @@ -175,7 +175,7 @@ public class WorkerTaskMonitor implements QuerySegmentWalker private boolean isTaskRunning(final Task task) { for (final Task runningTask : running) { - if (runningTask.equals(task.getId())) { + if (runningTask.getId().equals(task.getId())) { return true; } } diff --git a/pom.xml b/pom.xml index d35e832394a..2fd37a68160 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ com.metamx druid pom - 0.3.27-SNAPSHOT + 0.3.28-SNAPSHOT druid druid diff --git a/realtime/pom.xml b/realtime/pom.xml index e8cdc73e6f0..e8ac06907af 100644 --- a/realtime/pom.xml +++ b/realtime/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.27-SNAPSHOT + 0.3.28-SNAPSHOT diff --git a/server/pom.xml b/server/pom.xml index eb7c8ee42b5..caecb6b70cb 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.27-SNAPSHOT + 0.3.28-SNAPSHOT diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java b/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java index 38068600637..1328619db44 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java @@ -19,6 +19,7 @@ package com.metamx.druid.master; +import com.google.common.collect.Maps; import com.google.common.collect.MinMaxPriorityQueue; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; @@ -28,6 +29,7 @@ import com.metamx.druid.collect.CountingMap; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; +import javax.annotation.Nullable; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -38,6 +40,21 @@ public class DruidMasterLogger implements DruidMasterHelper { private static final Logger log = new Logger(DruidMasterLogger.class); + private void emitTieredStats(final ServiceEmitter emitter, final String formatString, final Map statMap) + { + if (statMap != null) { + for (Map.Entry entry : statMap.entrySet()) { + String tier = entry.getKey(); + Number value = entry.getValue(); + emitter.emit( + new ServiceMetricEvent.Builder().build( + String.format(formatString, tier), value.doubleValue() + ) + ); + } + } + } + @Override public DruidMasterRuntimeParams run(DruidMasterRuntimeParams params) { @@ -65,35 +82,30 @@ public class DruidMasterLogger implements DruidMasterHelper } } - emitter.emit( - new ServiceMetricEvent.Builder().build( - "master/cost/raw", stats.getGlobalStats().get("initialCost") - ) - ); + emitTieredStats(emitter, "master/%s/cost/raw", + stats.getPerTierStats().get("initialCost")); - emitter.emit( - new ServiceMetricEvent.Builder().build( - "master/cost/normalization", stats.getGlobalStats().get("normalization") - ) - ); + emitTieredStats(emitter, "master/%s/cost/normalization", + stats.getPerTierStats().get("normalization")); - emitter.emit( - new ServiceMetricEvent.Builder().build( - "master/cost/normalized", stats.getGlobalStats().get("normalizedInitialCostTimesOneThousand").doubleValue() / 1000d - ) - ); + emitTieredStats(emitter, "master/%s/moved/count", + stats.getPerTierStats().get("movedCount")); - emitter.emit( - new ServiceMetricEvent.Builder().build( - "master/moved/count", stats.getGlobalStats().get("movedCount") - ) - ); + emitTieredStats(emitter, "master/%s/deleted/count", + stats.getPerTierStats().get("deletedCount")); - emitter.emit( - new ServiceMetricEvent.Builder().build( - "master/deleted/count", stats.getGlobalStats().get("deletedCount") - ) - ); + emitTieredStats(emitter, "master/%s/cost/normalized", + Maps.transformEntries(stats.getPerTierStats().get("normalizedInitialCostTimesOneThousand"), + new Maps.EntryTransformer() + { + @Override + public Number transformEntry( + @Nullable String key, @Nullable AtomicLong value + ) + { + return value.doubleValue() / 1000d; + } + })); Map unneeded = stats.getPerTierStats().get("unneededCount"); if (unneeded != null) { diff --git a/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java b/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java index cb9a1468b80..a944bfef894 100644 --- a/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java +++ b/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java @@ -31,6 +31,8 @@ import com.metamx.common.ISE; import com.metamx.common.guava.BaseSequence; import com.metamx.common.guava.FunctionalIterator; import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import com.metamx.common.parsers.CloseableIterator; import com.metamx.druid.StorageAdapter; import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.aggregation.BufferAggregator; @@ -47,6 +49,7 @@ import org.joda.time.DateTime; import org.joda.time.Interval; import javax.annotation.Nullable; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Iterator; import java.util.List; @@ -85,33 +88,47 @@ public class GroupByQueryEngine final ResourceHolder bufferHolder = intermediateResultsBufferPool.take(); - return new BaseSequence>( - new BaseSequence.IteratorMaker>() + return Sequences.concat( + new BaseSequence, Iterator>>(new BaseSequence.IteratorMaker, Iterator>>() { @Override - public Iterator make() + public Iterator> make() { return FunctionalIterator - .create(cursors.iterator()) - .transformCat( - new Function>() - { - @Override - public Iterator apply(@Nullable final Cursor cursor) - { - return new RowIterator(query, cursor, bufferHolder.get()); - } - } - ); + .create(cursors.iterator()) + .transform(new Function>() + { + @Override + public Sequence apply(@Nullable final Cursor cursor) + { + return new BaseSequence>( + new BaseSequence.IteratorMaker>() + { + @Override + public CloseableIterator make() + { + return new RowIterator(query, cursor, bufferHolder.get()); + } + + @Override + public void cleanup(CloseableIterator iterFromMake) + { + Closeables.closeQuietly(iterFromMake); + } + } + ); + } + }); } @Override - public void cleanup(Iterator iterFromMake) + public void cleanup(Iterator> iterFromMake) { Closeables.closeQuietly(bufferHolder); } - } + }) ); + } private static class RowUpdater @@ -248,7 +265,7 @@ public class GroupByQueryEngine } } - private class RowIterator implements Iterator + private class RowIterator implements CloseableIterator { private final GroupByQuery query; private final Cursor cursor; @@ -384,5 +401,12 @@ public class GroupByQueryEngine { throw new UnsupportedOperationException(); } + + public void close() { + // cleanup + for(BufferAggregator agg : aggregators) { + agg.close(); + } + } } } diff --git a/server/src/main/java/com/metamx/druid/query/timeseries/TimeseriesQueryEngine.java b/server/src/main/java/com/metamx/druid/query/timeseries/TimeseriesQueryEngine.java index 018dc329fc2..24a3816d2de 100644 --- a/server/src/main/java/com/metamx/druid/query/timeseries/TimeseriesQueryEngine.java +++ b/server/src/main/java/com/metamx/druid/query/timeseries/TimeseriesQueryEngine.java @@ -80,7 +80,14 @@ public class TimeseriesQueryEngine bob.addMetric(postAgg); } - return bob.build(); + Result retVal = bob.build(); + + // cleanup + for(Aggregator agg : aggregators) { + agg.close(); + } + + return retVal; } } ).iterator(); diff --git a/server/src/test/java/com/metamx/druid/query/group/GroupByQueryRunnerTest.java b/server/src/test/java/com/metamx/druid/query/group/GroupByQueryRunnerTest.java index 6e86c9e1022..5dd95c9c4e0 100644 --- a/server/src/test/java/com/metamx/druid/query/group/GroupByQueryRunnerTest.java +++ b/server/src/test/java/com/metamx/druid/query/group/GroupByQueryRunnerTest.java @@ -30,6 +30,7 @@ import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.druid.PeriodGranularity; import com.metamx.druid.Query; +import com.metamx.druid.QueryGranularity; import com.metamx.druid.TestHelper; import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.aggregation.LongSumAggregatorFactory; @@ -232,6 +233,7 @@ public class GroupByQueryRunnerTest .setGranularity(new PeriodGranularity(new Period("P1M"), null, null)); final GroupByQuery fullQuery = builder.build(); + final GroupByQuery allGranQuery = builder.copy().setGranularity(QueryGranularity.ALL).build(); QueryRunner mergedRunner = new GroupByQueryQueryToolChest().mergeResults( new QueryRunner() @@ -265,6 +267,22 @@ public class GroupByQueryRunnerTest TestHelper.assertExpectedObjects(expectedResults, runner.run(fullQuery), "direct"); TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(fullQuery), "merged"); + + List allGranExpectedResults = Arrays.asList( + createExpectedRow("2011-04-02", "alias", "automotive", "rows", 2L, "idx", 269L), + createExpectedRow("2011-04-02", "alias", "business", "rows", 2L, "idx", 217L), + createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 2L, "idx", 319L), + createExpectedRow("2011-04-02", "alias", "health", "rows", 2L, "idx", 216L), + createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 6L, "idx", 4420L), + createExpectedRow("2011-04-02", "alias", "news", "rows", 2L, "idx", 221L), + createExpectedRow("2011-04-02", "alias", "premium", "rows", 6L, "idx", 4416L), + createExpectedRow("2011-04-02", "alias", "technology", "rows", 2L, "idx", 177L), + createExpectedRow("2011-04-02", "alias", "travel", "rows", 2L, "idx", 243L) + ); + + TestHelper.assertExpectedObjects(allGranExpectedResults, runner.run(allGranQuery), "direct"); + TestHelper.assertExpectedObjects(allGranExpectedResults, mergedRunner.run(allGranQuery), "merged"); + } private MapBasedRow createExpectedRow(final String timestamp, Object... vals)