Merge branch 'master' of github.com:metamx/druid

This commit is contained in:
Eric Tschetter 2013-03-26 09:07:28 -05:00
commit 42ca6aff4f
40 changed files with 299 additions and 92 deletions

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.27-SNAPSHOT</version> <version>0.3.28-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -24,8 +24,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -41,7 +39,7 @@ public class DruidServer implements Comparable
private final String name; private final String name;
private final ConcurrentMap<String, DruidDataSource> dataSources; private final ConcurrentMap<String, DruidDataSource> dataSources;
private final Map<String, DataSegment> segments; private final ConcurrentMap<String, DataSegment> segments;
private final String host; private final String host;
private final long maxSize; private final long maxSize;
@ -80,7 +78,7 @@ public class DruidServer implements Comparable
this.tier = tier; this.tier = tier;
this.dataSources = new ConcurrentHashMap<String, DruidDataSource>(); this.dataSources = new ConcurrentHashMap<String, DruidDataSource>();
this.segments = new HashMap<String, DataSegment>(); this.segments = new ConcurrentHashMap<String, DataSegment>();
} }
public String getName() public String getName()
@ -132,7 +130,7 @@ public class DruidServer implements Comparable
@JsonProperty @JsonProperty
public Map<String, DataSegment> getSegments() public Map<String, DataSegment> getSegments()
{ {
return segments; return ImmutableMap.copyOf(segments);
} }
public DataSegment getSegment(String segmentName) public DataSegment getSegment(String segmentName)

View File

@ -20,6 +20,7 @@
package com.metamx.druid.query; package com.metamx.druid.query;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
@ -27,6 +28,7 @@ import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.MergeIterable; import com.metamx.common.guava.MergeIterable;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
import com.metamx.druid.Query; import com.metamx.druid.Query;
import java.util.Arrays; import java.util.Arrays;
@ -52,6 +54,8 @@ import java.util.concurrent.Future;
*/ */
public class ChainedExecutionQueryRunner<T> implements QueryRunner<T> public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
{ {
private static final Logger log = new Logger(ChainedExecutionQueryRunner.class);
private final Iterable<QueryRunner<T>> queryables; private final Iterable<QueryRunner<T>> queryables;
private final ExecutorService exec; private final ExecutorService exec;
private final Ordering<T> ordering; private final Ordering<T> ordering;
@ -100,8 +104,14 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
@Override @Override
public List<T> call() throws Exception public List<T> call() throws Exception
{ {
try {
return Sequences.toList(input.run(query), Lists.<T>newArrayList()); return Sequences.toList(input.run(query), Lists.<T>newArrayList());
} }
catch (Exception e) {
log.error(e, "Exception with one of the sequences!");
throw Throwables.propagate(e);
}
}
} }
); );
} }

View File

@ -37,7 +37,6 @@ import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.input.MapBasedRow; import com.metamx.druid.input.MapBasedRow;
import com.metamx.druid.input.Row; import com.metamx.druid.input.Row;
import com.metamx.druid.input.Rows; import com.metamx.druid.input.Rows;
import com.metamx.druid.query.CacheStrategy;
import com.metamx.druid.query.MetricManipulationFn; import com.metamx.druid.query.MetricManipulationFn;
import com.metamx.druid.query.QueryRunner; import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryToolChest; import com.metamx.druid.query.QueryToolChest;
@ -78,7 +77,13 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
{ {
final GroupByQuery query = (GroupByQuery) input; final GroupByQuery query = (GroupByQuery) input;
List<Interval> condensed = query.getIntervals(); final QueryGranularity gran = query.getGranularity();
final long timeStart = query.getIntervals().get(0).getStartMillis();
// use gran.iterable instead of gran.truncate so that
// AllGranularity returns timeStart instead of Long.MIN_VALUE
final long granTimeStart = gran.iterable(timeStart, timeStart+1).iterator().next();
final List<AggregatorFactory> aggs = Lists.transform( final List<AggregatorFactory> aggs = Lists.transform(
query.getAggregatorSpecs(), query.getAggregatorSpecs(),
new Function<AggregatorFactory, AggregatorFactory>() new Function<AggregatorFactory, AggregatorFactory>()
@ -102,10 +107,11 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
} }
); );
final QueryGranularity gran = query.getGranularity();
final IncrementalIndex index = runner.run(query).accumulate( final IncrementalIndex index = runner.run(query).accumulate(
new IncrementalIndex( new IncrementalIndex(
gran.truncate(condensed.get(0).getStartMillis()), // use granularity truncated min timestamp
// since incoming truncated timestamps may precede timeStart
granTimeStart,
gran, gran,
aggs.toArray(new AggregatorFactory[aggs.size()]) aggs.toArray(new AggregatorFactory[aggs.size()])
), ),
@ -128,13 +134,11 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())), Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())),
new Function<Row, Row>() new Function<Row, Row>()
{ {
private final QueryGranularity granularity = query.getGranularity();
@Override @Override
public Row apply(Row input) public Row apply(Row input)
{ {
final MapBasedRow row = (MapBasedRow) input; final MapBasedRow row = (MapBasedRow) input;
return new MapBasedRow(granularity.toDateTime(row.getTimestampFromEpoch()), row.getEvent()); return new MapBasedRow(gran.toDateTime(row.getTimestampFromEpoch()), row.getEvent());
} }
} }
); );

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.27-SNAPSHOT</version> <version>0.3.28-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -19,8 +19,6 @@
package com.metamx.druid.aggregation; package com.metamx.druid.aggregation;
import java.util.Comparator;
/** /**
* An Aggregator is an object that can aggregate metrics. Its aggregation-related methods (namely, aggregate() and get()) * An Aggregator is an object that can aggregate metrics. Its aggregation-related methods (namely, aggregate() and get())
* do not take any arguments as the assumption is that the Aggregator was given something in its constructor that * do not take any arguments as the assumption is that the Aggregator was given something in its constructor that
@ -40,4 +38,5 @@ public interface Aggregator {
Object get(); Object get();
float getFloat(); float getFloat();
String getName(); String getName();
void close();
} }

View File

@ -88,4 +88,9 @@ public interface BufferAggregator
* @return the float representation of the aggregate * @return the float representation of the aggregate
*/ */
float getFloat(ByteBuffer buf, int position); float getFloat(ByteBuffer buf, int position);
/**
* Release any resources used by the aggregator
*/
void close();
} }

View File

@ -75,4 +75,10 @@ public class CountAggregator implements Aggregator
{ {
return new CountAggregator(name); return new CountAggregator(name);
} }
@Override
public void close()
{
// no resources to cleanup
}
} }

View File

@ -49,4 +49,10 @@ public class CountBufferAggregator implements BufferAggregator
{ {
return buf.getLong(position); return buf.getLong(position);
} }
@Override
public void close()
{
// no resources to cleanup
}
} }

View File

@ -89,4 +89,10 @@ public class DoubleSumAggregator implements Aggregator
{ {
return new DoubleSumAggregator(name, selector); return new DoubleSumAggregator(name, selector);
} }
@Override
public void close()
{
// no resources to cleanup
}
} }

View File

@ -59,4 +59,10 @@ public class DoubleSumBufferAggregator implements BufferAggregator
{ {
return (float) buf.getDouble(position); return (float) buf.getDouble(position);
} }
@Override
public void close()
{
// no resources to cleanup
}
} }

View File

@ -80,4 +80,10 @@ public class HistogramAggregator implements Aggregator
{ {
return name; return name;
} }
@Override
public void close()
{
// no resources to cleanup
}
} }

View File

@ -90,4 +90,10 @@ public class HistogramBufferAggregator implements BufferAggregator
{ {
throw new UnsupportedOperationException("HistogramBufferAggregator does not support getFloat()"); throw new UnsupportedOperationException("HistogramBufferAggregator does not support getFloat()");
} }
@Override
public void close()
{
// no resources to cleanup
}
} }

View File

@ -21,6 +21,7 @@ package com.metamx.druid.aggregation;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.metamx.druid.processing.FloatMetricSelector; import com.metamx.druid.processing.FloatMetricSelector;
import org.mozilla.javascript.Context;
import java.util.List; import java.util.List;
@ -33,6 +34,8 @@ public class JavaScriptAggregator implements Aggregator
public double combine(double a, double b); public double combine(double a, double b);
public double reset(); public double reset();
public void close();
} }
private final String name; private final String name;
@ -79,4 +82,10 @@ public class JavaScriptAggregator implements Aggregator
{ {
return name; return name;
} }
@Override
public void close()
{
script.close();
}
} }

View File

@ -30,6 +30,8 @@ import com.metamx.druid.processing.MetricSelectorFactory;
import org.mozilla.javascript.Context; import org.mozilla.javascript.Context;
import org.mozilla.javascript.ContextAction;
import org.mozilla.javascript.ContextFactory;
import org.mozilla.javascript.Function; import org.mozilla.javascript.Function;
import org.mozilla.javascript.Script; import org.mozilla.javascript.Script;
import org.mozilla.javascript.ScriptableObject; import org.mozilla.javascript.ScriptableObject;
@ -49,7 +51,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
private final List<String> fieldNames; private final List<String> fieldNames;
private final String script; private final String script;
private final JavaScriptAggregator.ScriptAggregator combiner; private final JavaScriptAggregator.ScriptAggregator compiledScript;
@JsonCreator @JsonCreator
public JavaScriptAggregatorFactory( public JavaScriptAggregatorFactory(
@ -61,7 +63,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
this.name = name; this.name = name;
this.script = expression; this.script = expression;
this.fieldNames = fieldNames; this.fieldNames = fieldNames;
this.combiner = compileScript(script); this.compiledScript = compileScript(script);
} }
@Override @Override
@ -77,7 +79,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
public FloatMetricSelector apply(@Nullable String s) { return metricFactory.makeFloatMetricSelector(s); } public FloatMetricSelector apply(@Nullable String s) { return metricFactory.makeFloatMetricSelector(s); }
} }
), ),
compileScript(script) compiledScript
); );
} }
@ -96,7 +98,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
} }
} }
), ),
compileScript(script) compiledScript
); );
} }
@ -109,7 +111,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
@Override @Override
public Object combine(Object lhs, Object rhs) public Object combine(Object lhs, Object rhs)
{ {
return combiner.combine(((Number) lhs).doubleValue(), ((Number) rhs).doubleValue()); return compiledScript.combine(((Number) lhs).doubleValue(), ((Number) rhs).doubleValue());
} }
@Override @Override
@ -187,7 +189,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
@Override @Override
public Object getAggregatorStartValue() public Object getAggregatorStartValue()
{ {
return combiner.reset(); return compiledScript.reset();
} }
@Override @Override
@ -212,22 +214,29 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
public static JavaScriptAggregator.ScriptAggregator compileScript(final String script) public static JavaScriptAggregator.ScriptAggregator compileScript(final String script)
{ {
final Context cx = Context.enter(); final ContextFactory contextFactory = ContextFactory.getGlobal();
cx.setOptimizationLevel(9); Context context = contextFactory.enterContext();
final ScriptableObject scope = cx.initStandardObjects(); context.setOptimizationLevel(9);
Script compiledScript = cx.compileString(script, "script", 1, null); final ScriptableObject scope = context.initStandardObjects();
compiledScript.exec(cx, scope);
Script compiledScript = context.compileString(script, "script", 1, null);
compiledScript.exec(context, scope);
final Function fnAggregate = getScriptFunction("aggregate", scope); final Function fnAggregate = getScriptFunction("aggregate", scope);
final Function fnReset = getScriptFunction("reset", scope); final Function fnReset = getScriptFunction("reset", scope);
final Function fnCombine = getScriptFunction("combine", scope); final Function fnCombine = getScriptFunction("combine", scope);
Context.exit();
return new JavaScriptAggregator.ScriptAggregator() return new JavaScriptAggregator.ScriptAggregator()
{ {
@Override @Override
public double aggregate(double current, FloatMetricSelector[] selectorList) public double aggregate(final double current, final FloatMetricSelector[] selectorList)
{ {
Context cx = Context.getCurrentContext();
if(cx == null) cx = contextFactory.enterContext();
final int size = selectorList.length; final int size = selectorList.length;
final Object[] args = new Object[size + 1]; final Object[] args = new Object[size + 1];
@ -237,29 +246,47 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
args[i + 1] = selectorList[i++].get(); args[i + 1] = selectorList[i++].get();
} }
Object res = fnAggregate.call(cx, scope, scope, args); final Object res = fnAggregate.call(cx, scope, scope, args);
return Context.toNumber(res); return Context.toNumber(res);
} }
@Override @Override
public double combine(double a, double b) public double combine(final double a, final double b)
{ {
Object res = fnCombine.call(cx, scope, scope, new Object[]{a, b}); final Object res = contextFactory.call(
new ContextAction()
{
@Override
public Object run(final Context cx)
{
return fnCombine.call(cx, scope, scope, new Object[]{a, b});
}
}
);
return Context.toNumber(res); return Context.toNumber(res);
} }
@Override @Override
public double reset() public double reset()
{ {
Object res = fnReset.call(cx, scope, scope, new Object[]{}); final Object res = contextFactory.call(
new ContextAction()
{
@Override
public Object run(final Context cx)
{
return fnReset.call(cx, scope, scope, new Object[]{});
}
}
);
return Context.toNumber(res); return Context.toNumber(res);
} }
@Override @Override
protected void finalize() throws Throwable public void close() {
{ if(Context.getCurrentContext() != null) {
cx.exit(); Context.exit();
super.finalize(); }
} }
}; };
} }

View File

@ -62,4 +62,9 @@ public class JavaScriptBufferAggregator implements BufferAggregator
{ {
return (float)buf.getDouble(position); return (float)buf.getDouble(position);
} }
@Override
public void close() {
script.close();
}
} }

View File

@ -89,4 +89,10 @@ public class LongSumAggregator implements Aggregator
{ {
return new LongSumAggregator(name, selector); return new LongSumAggregator(name, selector);
} }
@Override
public void close()
{
// no resources to cleanup
}
} }

View File

@ -59,4 +59,10 @@ public class LongSumBufferAggregator implements BufferAggregator
{ {
return (float) buf.getLong(position); return (float) buf.getLong(position);
} }
@Override
public void close()
{
// no resources to cleanup
}
} }

View File

@ -82,4 +82,10 @@ public class MaxAggregator implements Aggregator
{ {
return new MaxAggregator(name, selector); return new MaxAggregator(name, selector);
} }
@Override
public void close()
{
// no resources to cleanup
}
} }

View File

@ -57,4 +57,10 @@ public class MaxBufferAggregator implements BufferAggregator
{ {
return (float) buf.getDouble(position); return (float) buf.getDouble(position);
} }
@Override
public void close()
{
// no resources to cleanup
}
} }

View File

@ -82,4 +82,10 @@ public class MinAggregator implements Aggregator
{ {
return new MinAggregator(name, selector); return new MinAggregator(name, selector);
} }
@Override
public void close()
{
// no resources to cleanup
}
} }

View File

@ -57,4 +57,10 @@ public class MinBufferAggregator implements BufferAggregator
{ {
return (float) buf.getDouble(position); return (float) buf.getDouble(position);
} }
@Override
public void close()
{
// no resources to cleanup
}
} }

View File

@ -59,4 +59,10 @@ public class NoopAggregator implements Aggregator
{ {
return name; return name;
} }
@Override
public void close()
{
// no resources to cleanup
}
} }

View File

@ -46,4 +46,10 @@ public class NoopBufferAggregator implements BufferAggregator
{ {
return 0; return 0;
} }
@Override
public void close()
{
// no resources to cleanup
}
} }

View File

@ -0,0 +1,5 @@
package com.metamx.druid.processing;
public class MetricSelectorUtils
{
}

View File

@ -24,11 +24,11 @@
<artifactId>druid-services</artifactId> <artifactId>druid-services</artifactId>
<name>druid-services</name> <name>druid-services</name>
<description>druid-services</description> <description>druid-services</description>
<version>0.3.27-SNAPSHOT</version> <version>0.3.28-SNAPSHOT</version>
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.27-SNAPSHOT</version> <version>0.3.28-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.27-SNAPSHOT</version> <version>0.3.28-SNAPSHOT</version>
</parent> </parent>
<modules> <modules>

View File

@ -9,7 +9,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid-examples</artifactId> <artifactId>druid-examples</artifactId>
<version>0.3.27-SNAPSHOT</version> <version>0.3.28-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -9,7 +9,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid-examples</artifactId> <artifactId>druid-examples</artifactId>
<version>0.3.27-SNAPSHOT</version> <version>0.3.28-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.27-SNAPSHOT</version> <version>0.3.28-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.27-SNAPSHOT</version> <version>0.3.28-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.27-SNAPSHOT</version> <version>0.3.28-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -175,7 +175,7 @@ public class WorkerTaskMonitor implements QuerySegmentWalker
private boolean isTaskRunning(final Task task) private boolean isTaskRunning(final Task task)
{ {
for (final Task runningTask : running) { for (final Task runningTask : running) {
if (runningTask.equals(task.getId())) { if (runningTask.getId().equals(task.getId())) {
return true; return true;
} }
} }

View File

@ -23,7 +23,7 @@
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<packaging>pom</packaging> <packaging>pom</packaging>
<version>0.3.27-SNAPSHOT</version> <version>0.3.28-SNAPSHOT</version>
<name>druid</name> <name>druid</name>
<description>druid</description> <description>druid</description>
<scm> <scm>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.27-SNAPSHOT</version> <version>0.3.28-SNAPSHOT</version>
</parent> </parent>
<properties> <properties>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.27-SNAPSHOT</version> <version>0.3.28-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -19,6 +19,7 @@
package com.metamx.druid.master; package com.metamx.druid.master;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
@ -28,6 +29,7 @@ import com.metamx.druid.collect.CountingMap;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import javax.annotation.Nullable;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -38,6 +40,21 @@ public class DruidMasterLogger implements DruidMasterHelper
{ {
private static final Logger log = new Logger(DruidMasterLogger.class); private static final Logger log = new Logger(DruidMasterLogger.class);
private <T extends Number> void emitTieredStats(final ServiceEmitter emitter, final String formatString, final Map<String, T> statMap)
{
if (statMap != null) {
for (Map.Entry<String, T> entry : statMap.entrySet()) {
String tier = entry.getKey();
Number value = entry.getValue();
emitter.emit(
new ServiceMetricEvent.Builder().build(
String.format(formatString, tier), value.doubleValue()
)
);
}
}
}
@Override @Override
public DruidMasterRuntimeParams run(DruidMasterRuntimeParams params) public DruidMasterRuntimeParams run(DruidMasterRuntimeParams params)
{ {
@ -65,35 +82,30 @@ public class DruidMasterLogger implements DruidMasterHelper
} }
} }
emitter.emit( emitTieredStats(emitter, "master/%s/cost/raw",
new ServiceMetricEvent.Builder().build( stats.getPerTierStats().get("initialCost"));
"master/cost/raw", stats.getGlobalStats().get("initialCost")
)
);
emitter.emit( emitTieredStats(emitter, "master/%s/cost/normalization",
new ServiceMetricEvent.Builder().build( stats.getPerTierStats().get("normalization"));
"master/cost/normalization", stats.getGlobalStats().get("normalization")
)
);
emitter.emit( emitTieredStats(emitter, "master/%s/moved/count",
new ServiceMetricEvent.Builder().build( stats.getPerTierStats().get("movedCount"));
"master/cost/normalized", stats.getGlobalStats().get("normalizedInitialCostTimesOneThousand").doubleValue() / 1000d
)
);
emitter.emit( emitTieredStats(emitter, "master/%s/deleted/count",
new ServiceMetricEvent.Builder().build( stats.getPerTierStats().get("deletedCount"));
"master/moved/count", stats.getGlobalStats().get("movedCount")
)
);
emitter.emit( emitTieredStats(emitter, "master/%s/cost/normalized",
new ServiceMetricEvent.Builder().build( Maps.transformEntries(stats.getPerTierStats().get("normalizedInitialCostTimesOneThousand"),
"master/deleted/count", stats.getGlobalStats().get("deletedCount") new Maps.EntryTransformer<String, AtomicLong, Number>()
{
@Override
public Number transformEntry(
@Nullable String key, @Nullable AtomicLong value
) )
); {
return value.doubleValue() / 1000d;
}
}));
Map<String, AtomicLong> unneeded = stats.getPerTierStats().get("unneededCount"); Map<String, AtomicLong> unneeded = stats.getPerTierStats().get("unneededCount");
if (unneeded != null) { if (unneeded != null) {

View File

@ -31,6 +31,8 @@ import com.metamx.common.ISE;
import com.metamx.common.guava.BaseSequence; import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.FunctionalIterator; import com.metamx.common.guava.FunctionalIterator;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.parsers.CloseableIterator;
import com.metamx.druid.StorageAdapter; import com.metamx.druid.StorageAdapter;
import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.BufferAggregator; import com.metamx.druid.aggregation.BufferAggregator;
@ -47,6 +49,7 @@ import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -85,33 +88,47 @@ public class GroupByQueryEngine
final ResourceHolder<ByteBuffer> bufferHolder = intermediateResultsBufferPool.take(); final ResourceHolder<ByteBuffer> bufferHolder = intermediateResultsBufferPool.take();
return new BaseSequence<Row, Iterator<Row>>( return Sequences.concat(
new BaseSequence.IteratorMaker<Row, Iterator<Row>>() new BaseSequence<Sequence<Row>, Iterator<Sequence<Row>>>(new BaseSequence.IteratorMaker<Sequence<Row>, Iterator<Sequence<Row>>>()
{ {
@Override @Override
public Iterator<Row> make() public Iterator<Sequence<Row>> make()
{ {
return FunctionalIterator return FunctionalIterator
.create(cursors.iterator()) .create(cursors.iterator())
.transformCat( .transform(new Function<Cursor, Sequence<Row>>()
new Function<Cursor, Iterator<Row>>()
{ {
@Override @Override
public Iterator<Row> apply(@Nullable final Cursor cursor) public Sequence<Row> apply(@Nullable final Cursor cursor)
{
return new BaseSequence<Row, CloseableIterator<Row>>(
new BaseSequence.IteratorMaker<Row, CloseableIterator<Row>>()
{
@Override
public CloseableIterator<Row> make()
{ {
return new RowIterator(query, cursor, bufferHolder.get()); return new RowIterator(query, cursor, bufferHolder.get());
} }
@Override
public void cleanup(CloseableIterator iterFromMake)
{
Closeables.closeQuietly(iterFromMake);
}
} }
); );
} }
});
}
@Override @Override
public void cleanup(Iterator<Row> iterFromMake) public void cleanup(Iterator<Sequence<Row>> iterFromMake)
{ {
Closeables.closeQuietly(bufferHolder); Closeables.closeQuietly(bufferHolder);
} }
} })
); );
} }
private static class RowUpdater private static class RowUpdater
@ -248,7 +265,7 @@ public class GroupByQueryEngine
} }
} }
private class RowIterator implements Iterator<Row> private class RowIterator implements CloseableIterator<Row>
{ {
private final GroupByQuery query; private final GroupByQuery query;
private final Cursor cursor; private final Cursor cursor;
@ -384,5 +401,12 @@ public class GroupByQueryEngine
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
public void close() {
// cleanup
for(BufferAggregator agg : aggregators) {
agg.close();
}
}
} }
} }

View File

@ -80,7 +80,14 @@ public class TimeseriesQueryEngine
bob.addMetric(postAgg); bob.addMetric(postAgg);
} }
return bob.build(); Result<TimeseriesResultValue> retVal = bob.build();
// cleanup
for(Aggregator agg : aggregators) {
agg.close();
}
return retVal;
} }
} }
).iterator(); ).iterator();

View File

@ -30,6 +30,7 @@ import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import com.metamx.druid.PeriodGranularity; import com.metamx.druid.PeriodGranularity;
import com.metamx.druid.Query; import com.metamx.druid.Query;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.TestHelper; import com.metamx.druid.TestHelper;
import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.LongSumAggregatorFactory; import com.metamx.druid.aggregation.LongSumAggregatorFactory;
@ -232,6 +233,7 @@ public class GroupByQueryRunnerTest
.setGranularity(new PeriodGranularity(new Period("P1M"), null, null)); .setGranularity(new PeriodGranularity(new Period("P1M"), null, null));
final GroupByQuery fullQuery = builder.build(); final GroupByQuery fullQuery = builder.build();
final GroupByQuery allGranQuery = builder.copy().setGranularity(QueryGranularity.ALL).build();
QueryRunner mergedRunner = new GroupByQueryQueryToolChest().mergeResults( QueryRunner mergedRunner = new GroupByQueryQueryToolChest().mergeResults(
new QueryRunner<Row>() new QueryRunner<Row>()
@ -265,6 +267,22 @@ public class GroupByQueryRunnerTest
TestHelper.assertExpectedObjects(expectedResults, runner.run(fullQuery), "direct"); TestHelper.assertExpectedObjects(expectedResults, runner.run(fullQuery), "direct");
TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(fullQuery), "merged"); TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(fullQuery), "merged");
List<Row> allGranExpectedResults = Arrays.<Row>asList(
createExpectedRow("2011-04-02", "alias", "automotive", "rows", 2L, "idx", 269L),
createExpectedRow("2011-04-02", "alias", "business", "rows", 2L, "idx", 217L),
createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 2L, "idx", 319L),
createExpectedRow("2011-04-02", "alias", "health", "rows", 2L, "idx", 216L),
createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 6L, "idx", 4420L),
createExpectedRow("2011-04-02", "alias", "news", "rows", 2L, "idx", 221L),
createExpectedRow("2011-04-02", "alias", "premium", "rows", 6L, "idx", 4416L),
createExpectedRow("2011-04-02", "alias", "technology", "rows", 2L, "idx", 177L),
createExpectedRow("2011-04-02", "alias", "travel", "rows", 2L, "idx", 243L)
);
TestHelper.assertExpectedObjects(allGranExpectedResults, runner.run(allGranQuery), "direct");
TestHelper.assertExpectedObjects(allGranExpectedResults, mergedRunner.run(allGranQuery), "merged");
} }
private MapBasedRow createExpectedRow(final String timestamp, Object... vals) private MapBasedRow createExpectedRow(final String timestamp, Object... vals)