mirror of https://github.com/apache/druid.git
Fix how metrics are emitted from servermanager
This commit is contained in:
parent
66225d698f
commit
3a78960e65
|
@ -33,10 +33,13 @@ import java.io.IOException;
|
|||
*/
|
||||
public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
|
||||
{
|
||||
private static String DEFAULT_METRIC_NAME = "query/time";
|
||||
|
||||
private final ServiceEmitter emitter;
|
||||
private final Function<Query<T>, ServiceMetricEvent.Builder> builderFn;
|
||||
private final QueryRunner<T> queryRunner;
|
||||
private final long creationTime;
|
||||
private final String metricName;
|
||||
|
||||
public MetricsEmittingQueryRunner(
|
||||
ServiceEmitter emitter,
|
||||
|
@ -53,18 +56,36 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
|
|||
QueryRunner<T> queryRunner,
|
||||
long creationTime
|
||||
)
|
||||
{
|
||||
this(emitter, builderFn, queryRunner, creationTime, DEFAULT_METRIC_NAME);
|
||||
}
|
||||
|
||||
public MetricsEmittingQueryRunner(
|
||||
ServiceEmitter emitter,
|
||||
Function<Query<T>, ServiceMetricEvent.Builder> builderFn,
|
||||
QueryRunner<T> queryRunner,
|
||||
long creationTime,
|
||||
String metricName
|
||||
)
|
||||
{
|
||||
this.emitter = emitter;
|
||||
this.builderFn = builderFn;
|
||||
this.queryRunner = queryRunner;
|
||||
this.creationTime = creationTime;
|
||||
this.metricName = metricName;
|
||||
}
|
||||
|
||||
|
||||
public MetricsEmittingQueryRunner<T> withWaitMeasuredFromNow()
|
||||
{
|
||||
return new MetricsEmittingQueryRunner<T>(emitter, builderFn, queryRunner, System.currentTimeMillis());
|
||||
}
|
||||
|
||||
public MetricsEmittingQueryRunner<T> withWaitMetricName(String metricName)
|
||||
{
|
||||
return new MetricsEmittingQueryRunner<T>(emitter, builderFn, queryRunner, System.currentTimeMillis(), metricName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Sequence<T> run(final Query<T> query)
|
||||
{
|
||||
|
@ -97,9 +118,9 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
|
|||
finally {
|
||||
long timeTaken = System.currentTimeMillis() - startTime;
|
||||
|
||||
emitter.emit(builder.build("query/time", timeTaken));
|
||||
emitter.emit(builder.build(metricName, timeTaken));
|
||||
|
||||
if(creationTime > 0) {
|
||||
if (creationTime > 0) {
|
||||
emitter.emit(builder.build("query/wait", startTime - creationTime));
|
||||
}
|
||||
}
|
||||
|
@ -173,12 +194,13 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
|
|||
}
|
||||
|
||||
long timeTaken = System.currentTimeMillis() - startTime;
|
||||
emitter.emit(builder.build("query/time", timeTaken));
|
||||
emitter.emit(builder.build(metricName, timeTaken));
|
||||
|
||||
if (creationTime > 0) {
|
||||
emitter.emit(builder.build("query/wait", startTime - creationTime));
|
||||
}
|
||||
} finally {
|
||||
}
|
||||
finally {
|
||||
yielder.close();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -390,30 +390,43 @@ public class ServerManager implements QuerySegmentWalker
|
|||
{
|
||||
SpecificSegmentSpec segmentSpec = new SpecificSegmentSpec(segmentDescriptor);
|
||||
return new SpecificSegmentQueryRunner<T>(
|
||||
new BySegmentQueryRunner<T>(
|
||||
adapter.getIdentifier(),
|
||||
adapter.getDataInterval().getStart(),
|
||||
new CachingQueryRunner<T>(
|
||||
new MetricsEmittingQueryRunner<T>(
|
||||
emitter,
|
||||
new Function<Query<T>, ServiceMetricEvent.Builder>()
|
||||
{
|
||||
@Override
|
||||
public ServiceMetricEvent.Builder apply(@Nullable final Query<T> input)
|
||||
{
|
||||
return toolChest.makeMetricBuilder(input);
|
||||
}
|
||||
},
|
||||
new BySegmentQueryRunner<T>(
|
||||
adapter.getIdentifier(),
|
||||
segmentDescriptor,
|
||||
objectMapper,
|
||||
cache,
|
||||
toolChest,
|
||||
new MetricsEmittingQueryRunner<T>(
|
||||
emitter,
|
||||
new Function<Query<T>, ServiceMetricEvent.Builder>()
|
||||
{
|
||||
@Override
|
||||
public ServiceMetricEvent.Builder apply(@Nullable final Query<T> input)
|
||||
{
|
||||
return toolChest.makeMetricBuilder(input);
|
||||
}
|
||||
},
|
||||
new ReferenceCountingSegmentQueryRunner<T>(factory, adapter)
|
||||
).withWaitMeasuredFromNow(),
|
||||
cacheConfig
|
||||
adapter.getDataInterval().getStart(),
|
||||
new CachingQueryRunner<T>(
|
||||
adapter.getIdentifier(),
|
||||
segmentDescriptor,
|
||||
objectMapper,
|
||||
cache,
|
||||
toolChest,
|
||||
new MetricsEmittingQueryRunner<T>(
|
||||
emitter,
|
||||
new Function<Query<T>, ServiceMetricEvent.Builder>()
|
||||
{
|
||||
@Override
|
||||
public ServiceMetricEvent.Builder apply(@Nullable final Query<T> input)
|
||||
{
|
||||
return toolChest.makeMetricBuilder(input);
|
||||
}
|
||||
},
|
||||
new ReferenceCountingSegmentQueryRunner<T>(factory, adapter)
|
||||
)
|
||||
.withWaitMeasuredFromNow()
|
||||
.withWaitMetricName("scan/time"),
|
||||
cacheConfig
|
||||
)
|
||||
)
|
||||
),
|
||||
).withWaitMeasuredFromNow(),
|
||||
segmentSpec
|
||||
);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue