mirror of https://github.com/apache/druid.git
Merge branch 'master' of github.com:metamx/druid
This commit is contained in:
commit
082ea59cf1
|
@ -37,7 +37,6 @@ import com.metamx.druid.initialization.Initialization;
|
||||||
import com.metamx.druid.input.MapBasedRow;
|
import com.metamx.druid.input.MapBasedRow;
|
||||||
import com.metamx.druid.input.Row;
|
import com.metamx.druid.input.Row;
|
||||||
import com.metamx.druid.input.Rows;
|
import com.metamx.druid.input.Rows;
|
||||||
import com.metamx.druid.query.CacheStrategy;
|
|
||||||
import com.metamx.druid.query.MetricManipulationFn;
|
import com.metamx.druid.query.MetricManipulationFn;
|
||||||
import com.metamx.druid.query.QueryRunner;
|
import com.metamx.druid.query.QueryRunner;
|
||||||
import com.metamx.druid.query.QueryToolChest;
|
import com.metamx.druid.query.QueryToolChest;
|
||||||
|
@ -78,7 +77,13 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
||||||
{
|
{
|
||||||
final GroupByQuery query = (GroupByQuery) input;
|
final GroupByQuery query = (GroupByQuery) input;
|
||||||
|
|
||||||
List<Interval> condensed = query.getIntervals();
|
final QueryGranularity gran = query.getGranularity();
|
||||||
|
final long timeStart = query.getIntervals().get(0).getStartMillis();
|
||||||
|
|
||||||
|
// use gran.iterable instead of gran.truncate so that
|
||||||
|
// AllGranularity returns timeStart instead of Long.MIN_VALUE
|
||||||
|
final long granTimeStart = gran.iterable(timeStart, timeStart+1).iterator().next();
|
||||||
|
|
||||||
final List<AggregatorFactory> aggs = Lists.transform(
|
final List<AggregatorFactory> aggs = Lists.transform(
|
||||||
query.getAggregatorSpecs(),
|
query.getAggregatorSpecs(),
|
||||||
new Function<AggregatorFactory, AggregatorFactory>()
|
new Function<AggregatorFactory, AggregatorFactory>()
|
||||||
|
@ -102,10 +107,11 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
final QueryGranularity gran = query.getGranularity();
|
|
||||||
final IncrementalIndex index = runner.run(query).accumulate(
|
final IncrementalIndex index = runner.run(query).accumulate(
|
||||||
new IncrementalIndex(
|
new IncrementalIndex(
|
||||||
gran.truncate(condensed.get(0).getStartMillis()),
|
// use granularity truncated min timestamp
|
||||||
|
// since incoming truncated timestamps may precede timeStart
|
||||||
|
granTimeStart,
|
||||||
gran,
|
gran,
|
||||||
aggs.toArray(new AggregatorFactory[aggs.size()])
|
aggs.toArray(new AggregatorFactory[aggs.size()])
|
||||||
),
|
),
|
||||||
|
@ -128,13 +134,11 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
||||||
Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())),
|
Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())),
|
||||||
new Function<Row, Row>()
|
new Function<Row, Row>()
|
||||||
{
|
{
|
||||||
private final QueryGranularity granularity = query.getGranularity();
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Row apply(Row input)
|
public Row apply(Row input)
|
||||||
{
|
{
|
||||||
final MapBasedRow row = (MapBasedRow) input;
|
final MapBasedRow row = (MapBasedRow) input;
|
||||||
return new MapBasedRow(granularity.toDateTime(row.getTimestampFromEpoch()), row.getEvent());
|
return new MapBasedRow(gran.toDateTime(row.getTimestampFromEpoch()), row.getEvent());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
|
@ -19,8 +19,6 @@
|
||||||
|
|
||||||
package com.metamx.druid.aggregation;
|
package com.metamx.druid.aggregation;
|
||||||
|
|
||||||
import java.util.Comparator;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An Aggregator is an object that can aggregate metrics. Its aggregation-related methods (namely, aggregate() and get())
|
* An Aggregator is an object that can aggregate metrics. Its aggregation-related methods (namely, aggregate() and get())
|
||||||
* do not take any arguments as the assumption is that the Aggregator was given something in its constructor that
|
* do not take any arguments as the assumption is that the Aggregator was given something in its constructor that
|
||||||
|
@ -40,4 +38,5 @@ public interface Aggregator {
|
||||||
Object get();
|
Object get();
|
||||||
float getFloat();
|
float getFloat();
|
||||||
String getName();
|
String getName();
|
||||||
|
void close();
|
||||||
}
|
}
|
||||||
|
|
|
@ -88,4 +88,9 @@ public interface BufferAggregator
|
||||||
* @return the float representation of the aggregate
|
* @return the float representation of the aggregate
|
||||||
*/
|
*/
|
||||||
float getFloat(ByteBuffer buf, int position);
|
float getFloat(ByteBuffer buf, int position);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Release any resources used by the aggregator
|
||||||
|
*/
|
||||||
|
void close();
|
||||||
}
|
}
|
||||||
|
|
|
@ -75,4 +75,10 @@ public class CountAggregator implements Aggregator
|
||||||
{
|
{
|
||||||
return new CountAggregator(name);
|
return new CountAggregator(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close()
|
||||||
|
{
|
||||||
|
// no resources to cleanup
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,4 +49,10 @@ public class CountBufferAggregator implements BufferAggregator
|
||||||
{
|
{
|
||||||
return buf.getLong(position);
|
return buf.getLong(position);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close()
|
||||||
|
{
|
||||||
|
// no resources to cleanup
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -89,4 +89,10 @@ public class DoubleSumAggregator implements Aggregator
|
||||||
{
|
{
|
||||||
return new DoubleSumAggregator(name, selector);
|
return new DoubleSumAggregator(name, selector);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close()
|
||||||
|
{
|
||||||
|
// no resources to cleanup
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,4 +59,10 @@ public class DoubleSumBufferAggregator implements BufferAggregator
|
||||||
{
|
{
|
||||||
return (float) buf.getDouble(position);
|
return (float) buf.getDouble(position);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close()
|
||||||
|
{
|
||||||
|
// no resources to cleanup
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -80,4 +80,10 @@ public class HistogramAggregator implements Aggregator
|
||||||
{
|
{
|
||||||
return name;
|
return name;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close()
|
||||||
|
{
|
||||||
|
// no resources to cleanup
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -90,4 +90,10 @@ public class HistogramBufferAggregator implements BufferAggregator
|
||||||
{
|
{
|
||||||
throw new UnsupportedOperationException("HistogramBufferAggregator does not support getFloat()");
|
throw new UnsupportedOperationException("HistogramBufferAggregator does not support getFloat()");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close()
|
||||||
|
{
|
||||||
|
// no resources to cleanup
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ package com.metamx.druid.aggregation;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.metamx.druid.processing.FloatMetricSelector;
|
import com.metamx.druid.processing.FloatMetricSelector;
|
||||||
|
import org.mozilla.javascript.Context;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
@ -33,6 +34,8 @@ public class JavaScriptAggregator implements Aggregator
|
||||||
public double combine(double a, double b);
|
public double combine(double a, double b);
|
||||||
|
|
||||||
public double reset();
|
public double reset();
|
||||||
|
|
||||||
|
public void close();
|
||||||
}
|
}
|
||||||
|
|
||||||
private final String name;
|
private final String name;
|
||||||
|
@ -79,4 +82,10 @@ public class JavaScriptAggregator implements Aggregator
|
||||||
{
|
{
|
||||||
return name;
|
return name;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close()
|
||||||
|
{
|
||||||
|
script.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,6 +30,8 @@ import com.metamx.druid.processing.MetricSelectorFactory;
|
||||||
|
|
||||||
|
|
||||||
import org.mozilla.javascript.Context;
|
import org.mozilla.javascript.Context;
|
||||||
|
import org.mozilla.javascript.ContextAction;
|
||||||
|
import org.mozilla.javascript.ContextFactory;
|
||||||
import org.mozilla.javascript.Function;
|
import org.mozilla.javascript.Function;
|
||||||
import org.mozilla.javascript.Script;
|
import org.mozilla.javascript.Script;
|
||||||
import org.mozilla.javascript.ScriptableObject;
|
import org.mozilla.javascript.ScriptableObject;
|
||||||
|
@ -49,7 +51,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
|
||||||
private final List<String> fieldNames;
|
private final List<String> fieldNames;
|
||||||
private final String script;
|
private final String script;
|
||||||
|
|
||||||
private final JavaScriptAggregator.ScriptAggregator combiner;
|
private final JavaScriptAggregator.ScriptAggregator compiledScript;
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
public JavaScriptAggregatorFactory(
|
public JavaScriptAggregatorFactory(
|
||||||
|
@ -61,7 +63,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.script = expression;
|
this.script = expression;
|
||||||
this.fieldNames = fieldNames;
|
this.fieldNames = fieldNames;
|
||||||
this.combiner = compileScript(script);
|
this.compiledScript = compileScript(script);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -77,7 +79,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
|
||||||
public FloatMetricSelector apply(@Nullable String s) { return metricFactory.makeFloatMetricSelector(s); }
|
public FloatMetricSelector apply(@Nullable String s) { return metricFactory.makeFloatMetricSelector(s); }
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
compileScript(script)
|
compiledScript
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -96,7 +98,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
compileScript(script)
|
compiledScript
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -109,7 +111,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
|
||||||
@Override
|
@Override
|
||||||
public Object combine(Object lhs, Object rhs)
|
public Object combine(Object lhs, Object rhs)
|
||||||
{
|
{
|
||||||
return combiner.combine(((Number) lhs).doubleValue(), ((Number) rhs).doubleValue());
|
return compiledScript.combine(((Number) lhs).doubleValue(), ((Number) rhs).doubleValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -187,7 +189,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
|
||||||
@Override
|
@Override
|
||||||
public Object getAggregatorStartValue()
|
public Object getAggregatorStartValue()
|
||||||
{
|
{
|
||||||
return combiner.reset();
|
return compiledScript.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -212,22 +214,29 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
|
||||||
|
|
||||||
public static JavaScriptAggregator.ScriptAggregator compileScript(final String script)
|
public static JavaScriptAggregator.ScriptAggregator compileScript(final String script)
|
||||||
{
|
{
|
||||||
final Context cx = Context.enter();
|
final ContextFactory contextFactory = ContextFactory.getGlobal();
|
||||||
cx.setOptimizationLevel(9);
|
Context context = contextFactory.enterContext();
|
||||||
final ScriptableObject scope = cx.initStandardObjects();
|
context.setOptimizationLevel(9);
|
||||||
|
|
||||||
Script compiledScript = cx.compileString(script, "script", 1, null);
|
final ScriptableObject scope = context.initStandardObjects();
|
||||||
compiledScript.exec(cx, scope);
|
|
||||||
|
Script compiledScript = context.compileString(script, "script", 1, null);
|
||||||
|
compiledScript.exec(context, scope);
|
||||||
|
|
||||||
final Function fnAggregate = getScriptFunction("aggregate", scope);
|
final Function fnAggregate = getScriptFunction("aggregate", scope);
|
||||||
final Function fnReset = getScriptFunction("reset", scope);
|
final Function fnReset = getScriptFunction("reset", scope);
|
||||||
final Function fnCombine = getScriptFunction("combine", scope);
|
final Function fnCombine = getScriptFunction("combine", scope);
|
||||||
|
Context.exit();
|
||||||
|
|
||||||
|
|
||||||
return new JavaScriptAggregator.ScriptAggregator()
|
return new JavaScriptAggregator.ScriptAggregator()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public double aggregate(double current, FloatMetricSelector[] selectorList)
|
public double aggregate(final double current, final FloatMetricSelector[] selectorList)
|
||||||
{
|
{
|
||||||
|
Context cx = Context.getCurrentContext();
|
||||||
|
if(cx == null) cx = contextFactory.enterContext();
|
||||||
|
|
||||||
final int size = selectorList.length;
|
final int size = selectorList.length;
|
||||||
final Object[] args = new Object[size + 1];
|
final Object[] args = new Object[size + 1];
|
||||||
|
|
||||||
|
@ -237,29 +246,47 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
|
||||||
args[i + 1] = selectorList[i++].get();
|
args[i + 1] = selectorList[i++].get();
|
||||||
}
|
}
|
||||||
|
|
||||||
Object res = fnAggregate.call(cx, scope, scope, args);
|
final Object res = fnAggregate.call(cx, scope, scope, args);
|
||||||
return Context.toNumber(res);
|
return Context.toNumber(res);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public double combine(double a, double b)
|
public double combine(final double a, final double b)
|
||||||
{
|
{
|
||||||
Object res = fnCombine.call(cx, scope, scope, new Object[]{a, b});
|
final Object res = contextFactory.call(
|
||||||
|
new ContextAction()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Object run(final Context cx)
|
||||||
|
{
|
||||||
|
return fnCombine.call(cx, scope, scope, new Object[]{a, b});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
return Context.toNumber(res);
|
return Context.toNumber(res);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public double reset()
|
public double reset()
|
||||||
{
|
{
|
||||||
Object res = fnReset.call(cx, scope, scope, new Object[]{});
|
final Object res = contextFactory.call(
|
||||||
|
new ContextAction()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Object run(final Context cx)
|
||||||
|
{
|
||||||
|
return fnReset.call(cx, scope, scope, new Object[]{});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
return Context.toNumber(res);
|
return Context.toNumber(res);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void finalize() throws Throwable
|
public void close() {
|
||||||
{
|
if(Context.getCurrentContext() != null) {
|
||||||
cx.exit();
|
Context.exit();
|
||||||
super.finalize();
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,4 +62,9 @@ public class JavaScriptBufferAggregator implements BufferAggregator
|
||||||
{
|
{
|
||||||
return (float)buf.getDouble(position);
|
return (float)buf.getDouble(position);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
script.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -89,4 +89,10 @@ public class LongSumAggregator implements Aggregator
|
||||||
{
|
{
|
||||||
return new LongSumAggregator(name, selector);
|
return new LongSumAggregator(name, selector);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close()
|
||||||
|
{
|
||||||
|
// no resources to cleanup
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,4 +59,10 @@ public class LongSumBufferAggregator implements BufferAggregator
|
||||||
{
|
{
|
||||||
return (float) buf.getLong(position);
|
return (float) buf.getLong(position);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close()
|
||||||
|
{
|
||||||
|
// no resources to cleanup
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -82,4 +82,10 @@ public class MaxAggregator implements Aggregator
|
||||||
{
|
{
|
||||||
return new MaxAggregator(name, selector);
|
return new MaxAggregator(name, selector);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close()
|
||||||
|
{
|
||||||
|
// no resources to cleanup
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,4 +57,10 @@ public class MaxBufferAggregator implements BufferAggregator
|
||||||
{
|
{
|
||||||
return (float) buf.getDouble(position);
|
return (float) buf.getDouble(position);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close()
|
||||||
|
{
|
||||||
|
// no resources to cleanup
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -82,4 +82,10 @@ public class MinAggregator implements Aggregator
|
||||||
{
|
{
|
||||||
return new MinAggregator(name, selector);
|
return new MinAggregator(name, selector);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close()
|
||||||
|
{
|
||||||
|
// no resources to cleanup
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,4 +57,10 @@ public class MinBufferAggregator implements BufferAggregator
|
||||||
{
|
{
|
||||||
return (float) buf.getDouble(position);
|
return (float) buf.getDouble(position);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close()
|
||||||
|
{
|
||||||
|
// no resources to cleanup
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,4 +59,10 @@ public class NoopAggregator implements Aggregator
|
||||||
{
|
{
|
||||||
return name;
|
return name;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close()
|
||||||
|
{
|
||||||
|
// no resources to cleanup
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,4 +46,10 @@ public class NoopBufferAggregator implements BufferAggregator
|
||||||
{
|
{
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close()
|
||||||
|
{
|
||||||
|
// no resources to cleanup
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,5 @@
|
||||||
|
package com.metamx.druid.processing;
|
||||||
|
|
||||||
|
public class MetricSelectorUtils
|
||||||
|
{
|
||||||
|
}
|
|
@ -175,7 +175,7 @@ public class WorkerTaskMonitor implements QuerySegmentWalker
|
||||||
private boolean isTaskRunning(final Task task)
|
private boolean isTaskRunning(final Task task)
|
||||||
{
|
{
|
||||||
for (final Task runningTask : running) {
|
for (final Task runningTask : running) {
|
||||||
if (runningTask.equals(task.getId())) {
|
if (runningTask.getId().equals(task.getId())) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package com.metamx.druid.master;
|
package com.metamx.druid.master;
|
||||||
|
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
import com.google.common.collect.MinMaxPriorityQueue;
|
import com.google.common.collect.MinMaxPriorityQueue;
|
||||||
import com.metamx.common.logger.Logger;
|
import com.metamx.common.logger.Logger;
|
||||||
import com.metamx.druid.client.DataSegment;
|
import com.metamx.druid.client.DataSegment;
|
||||||
|
@ -28,6 +29,7 @@ import com.metamx.druid.collect.CountingMap;
|
||||||
import com.metamx.emitter.service.ServiceEmitter;
|
import com.metamx.emitter.service.ServiceEmitter;
|
||||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
@ -38,6 +40,21 @@ public class DruidMasterLogger implements DruidMasterHelper
|
||||||
{
|
{
|
||||||
private static final Logger log = new Logger(DruidMasterLogger.class);
|
private static final Logger log = new Logger(DruidMasterLogger.class);
|
||||||
|
|
||||||
|
private <T extends Number> void emitTieredStats(final ServiceEmitter emitter, final String formatString, final Map<String, T> statMap)
|
||||||
|
{
|
||||||
|
if (statMap != null) {
|
||||||
|
for (Map.Entry<String, T> entry : statMap.entrySet()) {
|
||||||
|
String tier = entry.getKey();
|
||||||
|
Number value = entry.getValue();
|
||||||
|
emitter.emit(
|
||||||
|
new ServiceMetricEvent.Builder().build(
|
||||||
|
String.format(formatString, tier), value.doubleValue()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public DruidMasterRuntimeParams run(DruidMasterRuntimeParams params)
|
public DruidMasterRuntimeParams run(DruidMasterRuntimeParams params)
|
||||||
{
|
{
|
||||||
|
@ -65,35 +82,30 @@ public class DruidMasterLogger implements DruidMasterHelper
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
emitter.emit(
|
emitTieredStats(emitter, "master/%s/cost/raw",
|
||||||
new ServiceMetricEvent.Builder().build(
|
stats.getPerTierStats().get("initialCost"));
|
||||||
"master/cost/raw", stats.getGlobalStats().get("initialCost")
|
|
||||||
)
|
|
||||||
);
|
|
||||||
|
|
||||||
emitter.emit(
|
emitTieredStats(emitter, "master/%s/cost/normalization",
|
||||||
new ServiceMetricEvent.Builder().build(
|
stats.getPerTierStats().get("normalization"));
|
||||||
"master/cost/normalization", stats.getGlobalStats().get("normalization")
|
|
||||||
)
|
|
||||||
);
|
|
||||||
|
|
||||||
emitter.emit(
|
emitTieredStats(emitter, "master/%s/moved/count",
|
||||||
new ServiceMetricEvent.Builder().build(
|
stats.getPerTierStats().get("movedCount"));
|
||||||
"master/cost/normalized", stats.getGlobalStats().get("normalizedInitialCostTimesOneThousand").doubleValue() / 1000d
|
|
||||||
)
|
|
||||||
);
|
|
||||||
|
|
||||||
emitter.emit(
|
emitTieredStats(emitter, "master/%s/deleted/count",
|
||||||
new ServiceMetricEvent.Builder().build(
|
stats.getPerTierStats().get("deletedCount"));
|
||||||
"master/moved/count", stats.getGlobalStats().get("movedCount")
|
|
||||||
)
|
|
||||||
);
|
|
||||||
|
|
||||||
emitter.emit(
|
emitTieredStats(emitter, "master/%s/cost/normalized",
|
||||||
new ServiceMetricEvent.Builder().build(
|
Maps.transformEntries(stats.getPerTierStats().get("normalizedInitialCostTimesOneThousand"),
|
||||||
"master/deleted/count", stats.getGlobalStats().get("deletedCount")
|
new Maps.EntryTransformer<String, AtomicLong, Number>()
|
||||||
)
|
{
|
||||||
);
|
@Override
|
||||||
|
public Number transformEntry(
|
||||||
|
@Nullable String key, @Nullable AtomicLong value
|
||||||
|
)
|
||||||
|
{
|
||||||
|
return value.doubleValue() / 1000d;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
Map<String, AtomicLong> unneeded = stats.getPerTierStats().get("unneededCount");
|
Map<String, AtomicLong> unneeded = stats.getPerTierStats().get("unneededCount");
|
||||||
if (unneeded != null) {
|
if (unneeded != null) {
|
||||||
|
|
|
@ -31,6 +31,8 @@ import com.metamx.common.ISE;
|
||||||
import com.metamx.common.guava.BaseSequence;
|
import com.metamx.common.guava.BaseSequence;
|
||||||
import com.metamx.common.guava.FunctionalIterator;
|
import com.metamx.common.guava.FunctionalIterator;
|
||||||
import com.metamx.common.guava.Sequence;
|
import com.metamx.common.guava.Sequence;
|
||||||
|
import com.metamx.common.guava.Sequences;
|
||||||
|
import com.metamx.common.parsers.CloseableIterator;
|
||||||
import com.metamx.druid.StorageAdapter;
|
import com.metamx.druid.StorageAdapter;
|
||||||
import com.metamx.druid.aggregation.AggregatorFactory;
|
import com.metamx.druid.aggregation.AggregatorFactory;
|
||||||
import com.metamx.druid.aggregation.BufferAggregator;
|
import com.metamx.druid.aggregation.BufferAggregator;
|
||||||
|
@ -47,6 +49,7 @@ import org.joda.time.DateTime;
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -85,33 +88,47 @@ public class GroupByQueryEngine
|
||||||
|
|
||||||
final ResourceHolder<ByteBuffer> bufferHolder = intermediateResultsBufferPool.take();
|
final ResourceHolder<ByteBuffer> bufferHolder = intermediateResultsBufferPool.take();
|
||||||
|
|
||||||
return new BaseSequence<Row, Iterator<Row>>(
|
return Sequences.concat(
|
||||||
new BaseSequence.IteratorMaker<Row, Iterator<Row>>()
|
new BaseSequence<Sequence<Row>, Iterator<Sequence<Row>>>(new BaseSequence.IteratorMaker<Sequence<Row>, Iterator<Sequence<Row>>>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public Iterator<Row> make()
|
public Iterator<Sequence<Row>> make()
|
||||||
{
|
{
|
||||||
return FunctionalIterator
|
return FunctionalIterator
|
||||||
.create(cursors.iterator())
|
.create(cursors.iterator())
|
||||||
.transformCat(
|
.transform(new Function<Cursor, Sequence<Row>>()
|
||||||
new Function<Cursor, Iterator<Row>>()
|
{
|
||||||
{
|
@Override
|
||||||
@Override
|
public Sequence<Row> apply(@Nullable final Cursor cursor)
|
||||||
public Iterator<Row> apply(@Nullable final Cursor cursor)
|
{
|
||||||
{
|
return new BaseSequence<Row, CloseableIterator<Row>>(
|
||||||
return new RowIterator(query, cursor, bufferHolder.get());
|
new BaseSequence.IteratorMaker<Row, CloseableIterator<Row>>()
|
||||||
}
|
{
|
||||||
}
|
@Override
|
||||||
);
|
public CloseableIterator<Row> make()
|
||||||
|
{
|
||||||
|
return new RowIterator(query, cursor, bufferHolder.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void cleanup(CloseableIterator iterFromMake)
|
||||||
|
{
|
||||||
|
Closeables.closeQuietly(iterFromMake);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cleanup(Iterator<Row> iterFromMake)
|
public void cleanup(Iterator<Sequence<Row>> iterFromMake)
|
||||||
{
|
{
|
||||||
Closeables.closeQuietly(bufferHolder);
|
Closeables.closeQuietly(bufferHolder);
|
||||||
}
|
}
|
||||||
}
|
})
|
||||||
);
|
);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class RowUpdater
|
private static class RowUpdater
|
||||||
|
@ -248,7 +265,7 @@ public class GroupByQueryEngine
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private class RowIterator implements Iterator<Row>
|
private class RowIterator implements CloseableIterator<Row>
|
||||||
{
|
{
|
||||||
private final GroupByQuery query;
|
private final GroupByQuery query;
|
||||||
private final Cursor cursor;
|
private final Cursor cursor;
|
||||||
|
@ -384,5 +401,12 @@ public class GroupByQueryEngine
|
||||||
{
|
{
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void close() {
|
||||||
|
// cleanup
|
||||||
|
for(BufferAggregator agg : aggregators) {
|
||||||
|
agg.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -80,7 +80,14 @@ public class TimeseriesQueryEngine
|
||||||
bob.addMetric(postAgg);
|
bob.addMetric(postAgg);
|
||||||
}
|
}
|
||||||
|
|
||||||
return bob.build();
|
Result<TimeseriesResultValue> retVal = bob.build();
|
||||||
|
|
||||||
|
// cleanup
|
||||||
|
for(Aggregator agg : aggregators) {
|
||||||
|
agg.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
return retVal;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
).iterator();
|
).iterator();
|
||||||
|
|
|
@ -30,6 +30,7 @@ import com.metamx.common.guava.Sequence;
|
||||||
import com.metamx.common.guava.Sequences;
|
import com.metamx.common.guava.Sequences;
|
||||||
import com.metamx.druid.PeriodGranularity;
|
import com.metamx.druid.PeriodGranularity;
|
||||||
import com.metamx.druid.Query;
|
import com.metamx.druid.Query;
|
||||||
|
import com.metamx.druid.QueryGranularity;
|
||||||
import com.metamx.druid.TestHelper;
|
import com.metamx.druid.TestHelper;
|
||||||
import com.metamx.druid.aggregation.AggregatorFactory;
|
import com.metamx.druid.aggregation.AggregatorFactory;
|
||||||
import com.metamx.druid.aggregation.LongSumAggregatorFactory;
|
import com.metamx.druid.aggregation.LongSumAggregatorFactory;
|
||||||
|
@ -232,6 +233,7 @@ public class GroupByQueryRunnerTest
|
||||||
.setGranularity(new PeriodGranularity(new Period("P1M"), null, null));
|
.setGranularity(new PeriodGranularity(new Period("P1M"), null, null));
|
||||||
|
|
||||||
final GroupByQuery fullQuery = builder.build();
|
final GroupByQuery fullQuery = builder.build();
|
||||||
|
final GroupByQuery allGranQuery = builder.copy().setGranularity(QueryGranularity.ALL).build();
|
||||||
|
|
||||||
QueryRunner mergedRunner = new GroupByQueryQueryToolChest().mergeResults(
|
QueryRunner mergedRunner = new GroupByQueryQueryToolChest().mergeResults(
|
||||||
new QueryRunner<Row>()
|
new QueryRunner<Row>()
|
||||||
|
@ -265,6 +267,22 @@ public class GroupByQueryRunnerTest
|
||||||
|
|
||||||
TestHelper.assertExpectedObjects(expectedResults, runner.run(fullQuery), "direct");
|
TestHelper.assertExpectedObjects(expectedResults, runner.run(fullQuery), "direct");
|
||||||
TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(fullQuery), "merged");
|
TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(fullQuery), "merged");
|
||||||
|
|
||||||
|
List<Row> allGranExpectedResults = Arrays.<Row>asList(
|
||||||
|
createExpectedRow("2011-04-02", "alias", "automotive", "rows", 2L, "idx", 269L),
|
||||||
|
createExpectedRow("2011-04-02", "alias", "business", "rows", 2L, "idx", 217L),
|
||||||
|
createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 2L, "idx", 319L),
|
||||||
|
createExpectedRow("2011-04-02", "alias", "health", "rows", 2L, "idx", 216L),
|
||||||
|
createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 6L, "idx", 4420L),
|
||||||
|
createExpectedRow("2011-04-02", "alias", "news", "rows", 2L, "idx", 221L),
|
||||||
|
createExpectedRow("2011-04-02", "alias", "premium", "rows", 6L, "idx", 4416L),
|
||||||
|
createExpectedRow("2011-04-02", "alias", "technology", "rows", 2L, "idx", 177L),
|
||||||
|
createExpectedRow("2011-04-02", "alias", "travel", "rows", 2L, "idx", 243L)
|
||||||
|
);
|
||||||
|
|
||||||
|
TestHelper.assertExpectedObjects(allGranExpectedResults, runner.run(allGranQuery), "direct");
|
||||||
|
TestHelper.assertExpectedObjects(allGranExpectedResults, mergedRunner.run(allGranQuery), "merged");
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private MapBasedRow createExpectedRow(final String timestamp, Object... vals)
|
private MapBasedRow createExpectedRow(final String timestamp, Object... vals)
|
||||||
|
|
Loading…
Reference in New Issue