Don't pass QueryMetrics down in concurrent and async QueryRunners (fixes #4279) (#4288)

* Don't pass QueryMetrics down in concurrent and async QueryRunners

* Rename QueryPlus.threadSafe() to withoutThreadUnsafeState(); Update QueryPlus.withQueryMetrics() Javadocs; Fix generics in MetricsEmittingQueryRunner and CpuTimeMetricQueryRunner; Make DefaultQueryMetrics to fail fast on modifications from concurrent threads
This commit is contained in:
Roman Leventov 2017-05-22 13:42:09 -05:00 committed by Himanshu
parent 000b0ffed7
commit 8ec3a29af0
13 changed files with 132 additions and 70 deletions

View File

@ -51,6 +51,7 @@ public class AsyncQueryRunner<T> implements QueryRunner<T>
{
final Query<T> query = queryPlus.getQuery();
final int priority = QueryContexts.getPriority(query);
final QueryPlus<T> threadSafeQueryPlus = queryPlus.withoutThreadUnsafeState();
final ListenableFuture<Sequence<T>> future = executor.submit(new AbstractPrioritizedCallable<Sequence<T>>(priority)
{
@Override
@ -58,7 +59,7 @@ public class AsyncQueryRunner<T> implements QueryRunner<T>
{
//Note: this is assumed that baseRunner does most of the work eagerly on call to the
//run() method and resulting sequence accumulate/yield is fast.
return baseRunner.run(queryPlus, responseContext);
return baseRunner.run(threadSafeQueryPlus, responseContext);
}
});
queryWatcher.registerQuery(query, future);

View File

@ -33,14 +33,14 @@ import java.util.concurrent.atomic.AtomicLong;
public class CPUTimeMetricQueryRunner<T> implements QueryRunner<T>
{
private final QueryRunner<T> delegate;
private final QueryToolChest<?, ? super Query<T>> queryToolChest;
private final QueryToolChest<T, ? extends Query<T>> queryToolChest;
private final ServiceEmitter emitter;
private final AtomicLong cpuTimeAccumulator;
private final boolean report;
private CPUTimeMetricQueryRunner(
QueryRunner<T> delegate,
QueryToolChest<?, ? super Query<T>> queryToolChest,
QueryToolChest<T, ? extends Query<T>> queryToolChest,
ServiceEmitter emitter,
AtomicLong cpuTimeAccumulator,
boolean report
@ -60,8 +60,7 @@ public class CPUTimeMetricQueryRunner<T> implements QueryRunner<T>
@Override
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
{
final QueryPlus<T> queryWithMetrics =
queryPlus.withQueryMetrics((QueryToolChest<T, ? extends Query<T>>) queryToolChest);
final QueryPlus<T> queryWithMetrics = queryPlus.withQueryMetrics(queryToolChest);
final Sequence<T> baseSequence = delegate.run(queryWithMetrics, responseContext);
return Sequences.wrap(
baseSequence,
@ -94,7 +93,7 @@ public class CPUTimeMetricQueryRunner<T> implements QueryRunner<T>
public static <T> QueryRunner<T> safeBuild(
QueryRunner<T> delegate,
QueryToolChest<?, ? super Query<T>> queryToolChest,
QueryToolChest<T, ? extends Query<T>> queryToolChest,
ServiceEmitter emitter,
AtomicLong accumulator,
boolean report

View File

@ -94,7 +94,7 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
Query<T> query = queryPlus.getQuery();
final int priority = QueryContexts.getPriority(query);
final Ordering ordering = query.getResultOrdering();
final QueryPlus<T> threadSafeQueryPlus = queryPlus.withoutThreadUnsafeState();
return new BaseSequence<T, Iterator<T>>(
new BaseSequence.IteratorMaker<T, Iterator<T>>()
{
@ -122,7 +122,7 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
public Iterable<T> call() throws Exception
{
try {
Sequence<T> result = input.run(queryPlus, responseContext);
Sequence<T> result = input.run(threadSafeQueryPlus, responseContext);
if (result == null) {
throw new ISE("Got a null result! Segments are missing!");
}

View File

@ -31,17 +31,42 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* DefaultQueryMetrics is unsafe for use from multiple threads. It fails with RuntimeException on access not from the
* thread where it was constructed. To "transfer" DefaultQueryMetrics from one thread to another {@link #ownerThread}
* field should be updated.
*/
public class DefaultQueryMetrics<QueryType extends Query<?>> implements QueryMetrics<QueryType>
{
protected final ObjectMapper jsonMapper;
protected final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
protected final Map<String, Number> metrics = new HashMap<>();
/** Non final to give subclasses ability to reassign it. */
protected Thread ownerThread = Thread.currentThread();
public DefaultQueryMetrics(ObjectMapper jsonMapper)
{
this.jsonMapper = jsonMapper;
}
protected void checkModifiedFromOwnerThread()
{
if (Thread.currentThread() != ownerThread) {
throw new IllegalStateException(
"DefaultQueryMetrics must not be modified from multiple threads. If it is needed to gather dimension or "
+ "metric information from multiple threads or from an async thread, this information should explicitly be "
+ "passed between threads (e. g. using Futures), or this DefaultQueryMetrics's ownerThread should be "
+ "reassigned explicitly");
}
}
protected void setDimension(String dimension, String value)
{
checkModifiedFromOwnerThread();
builder.setDimension(dimension, value);
}
@Override
public void query(QueryType query)
{
@ -56,18 +81,19 @@ public class DefaultQueryMetrics<QueryType extends Query<?>> implements QueryMet
@Override
public void dataSource(QueryType query)
{
builder.setDimension(DruidMetrics.DATASOURCE, DataSourceUtil.getMetricName(query.getDataSource()));
setDimension(DruidMetrics.DATASOURCE, DataSourceUtil.getMetricName(query.getDataSource()));
}
@Override
public void queryType(QueryType query)
{
builder.setDimension(DruidMetrics.TYPE, query.getType());
setDimension(DruidMetrics.TYPE, query.getType());
}
@Override
public void interval(QueryType query)
{
checkModifiedFromOwnerThread();
builder.setDimension(
DruidMetrics.INTERVAL,
query.getIntervals().stream().map(Interval::toString).toArray(String[]::new)
@ -77,32 +103,28 @@ public class DefaultQueryMetrics<QueryType extends Query<?>> implements QueryMet
@Override
public void hasFilters(QueryType query)
{
builder.setDimension("hasFilters", String.valueOf(query.hasFilters()));
setDimension("hasFilters", String.valueOf(query.hasFilters()));
}
@Override
public void duration(QueryType query)
{
builder.setDimension("duration", query.getDuration().toString());
setDimension("duration", query.getDuration().toString());
}
@Override
public void queryId(QueryType query)
{
builder.setDimension(DruidMetrics.ID, Strings.nullToEmpty(query.getId()));
setDimension(DruidMetrics.ID, Strings.nullToEmpty(query.getId()));
}
@Override
public void context(QueryType query)
{
try {
builder.setDimension(
setDimension(
"context",
jsonMapper.writeValueAsString(
query.getContext() == null
? ImmutableMap.of()
: query.getContext()
)
jsonMapper.writeValueAsString(query.getContext() == null ? ImmutableMap.of() : query.getContext())
);
}
catch (JsonProcessingException e) {
@ -113,111 +135,115 @@ public class DefaultQueryMetrics<QueryType extends Query<?>> implements QueryMet
@Override
public void server(String host)
{
builder.setDimension("server", host);
setDimension("server", host);
}
@Override
public void remoteAddress(String remoteAddress)
{
builder.setDimension("remoteAddress", remoteAddress);
setDimension("remoteAddress", remoteAddress);
}
@Override
public void status(String status)
{
builder.setDimension(DruidMetrics.STATUS, status);
setDimension(DruidMetrics.STATUS, status);
}
@Override
public void success(boolean success)
{
builder.setDimension("success", String.valueOf(success));
setDimension("success", String.valueOf(success));
}
@Override
public void segment(String segmentIdentifier)
{
builder.setDimension("segment", segmentIdentifier);
setDimension("segment", segmentIdentifier);
}
@Override
public void chunkInterval(Interval interval)
{
builder.setDimension("chunkInterval", interval.toString());
setDimension("chunkInterval", interval.toString());
}
@Override
public QueryMetrics<QueryType> reportQueryTime(long timeNs)
{
return defaultTimeMetric("query/time", timeNs);
return reportMillisTimeMetric("query/time", timeNs);
}
@Override
public QueryMetrics<QueryType> reportQueryBytes(long byteCount)
{
metrics.put("query/bytes", byteCount);
return this;
return reportMetric("query/bytes", byteCount);
}
@Override
public QueryMetrics<QueryType> reportWaitTime(long timeNs)
{
return defaultTimeMetric("query/wait/time", timeNs);
return reportMillisTimeMetric("query/wait/time", timeNs);
}
@Override
public QueryMetrics<QueryType> reportSegmentTime(long timeNs)
{
return defaultTimeMetric("query/segment/time", timeNs);
return reportMillisTimeMetric("query/segment/time", timeNs);
}
@Override
public QueryMetrics<QueryType> reportSegmentAndCacheTime(long timeNs)
{
return defaultTimeMetric("query/segmentAndCache/time", timeNs);
return reportMillisTimeMetric("query/segmentAndCache/time", timeNs);
}
@Override
public QueryMetrics<QueryType> reportIntervalChunkTime(long timeNs)
{
return defaultTimeMetric("query/intervalChunk/time", timeNs);
return reportMillisTimeMetric("query/intervalChunk/time", timeNs);
}
@Override
public QueryMetrics<QueryType> reportCpuTime(long timeNs)
{
metrics.put("query/cpu/time", TimeUnit.NANOSECONDS.toMicros(timeNs));
return this;
return reportMetric("query/cpu/time", TimeUnit.NANOSECONDS.toMicros(timeNs));
}
@Override
public QueryMetrics<QueryType> reportNodeTimeToFirstByte(long timeNs)
{
return defaultTimeMetric("query/node/ttfb", timeNs);
return reportMillisTimeMetric("query/node/ttfb", timeNs);
}
@Override
public QueryMetrics<QueryType> reportNodeTime(long timeNs)
{
return defaultTimeMetric("query/node/time", timeNs);
return reportMillisTimeMetric("query/node/time", timeNs);
}
private QueryMetrics<QueryType> defaultTimeMetric(String metricName, long timeNs)
private QueryMetrics<QueryType> reportMillisTimeMetric(String metricName, long timeNs)
{
metrics.put(metricName, TimeUnit.NANOSECONDS.toMillis(timeNs));
return reportMetric(metricName, TimeUnit.NANOSECONDS.toMillis(timeNs));
}
protected QueryMetrics<QueryType> reportMetric(String metricName, Number value)
{
checkModifiedFromOwnerThread();
metrics.put(metricName, value);
return this;
}
@Override
public QueryMetrics<QueryType> reportNodeBytes(long byteCount)
{
metrics.put("query/node/bytes", byteCount);
return this;
return reportMetric("query/node/bytes", byteCount);
}
@Override
public void emit(ServiceEmitter emitter)
{
checkModifiedFromOwnerThread();
for (Map.Entry<String, Number> metric : metrics.entrySet()) {
emitter.emit(builder.build(metric.getKey(), metric.getValue()));
}

View File

@ -92,8 +92,8 @@ public class GroupByMergedQueryRunner<T> implements QueryRunner<T>
final Pair<Queue, Accumulator<Queue, T>> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair();
final boolean bySegment = QueryContexts.isBySegment(query);
final int priority = QueryContexts.getPriority(query);
ListenableFuture<List<Void>> futures = Futures.allAsList(
final QueryPlus<T> threadSafeQueryPlus = queryPlus.withoutThreadUnsafeState();
final ListenableFuture<List<Void>> futures = Futures.allAsList(
Lists.newArrayList(
Iterables.transform(
queryables,
@ -114,10 +114,10 @@ public class GroupByMergedQueryRunner<T> implements QueryRunner<T>
{
try {
if (bySegment) {
input.run(queryPlus, responseContext)
input.run(threadSafeQueryPlus, responseContext)
.accumulate(bySegmentAccumulatorPair.lhs, bySegmentAccumulatorPair.rhs);
} else {
input.run(queryPlus, responseContext)
input.run(threadSafeQueryPlus, responseContext)
.accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
}

View File

@ -34,19 +34,19 @@ import java.util.function.ObjLongConsumer;
public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
{
private final ServiceEmitter emitter;
private final QueryToolChest<?, ? super Query<T>> queryToolChest;
private final QueryToolChest<T, ? extends Query<T>> queryToolChest;
private final QueryRunner<T> queryRunner;
private final long creationTimeNs;
private final ObjLongConsumer<? super QueryMetrics<? super Query<T>>> reportMetric;
private final Consumer<QueryMetrics<? super Query<T>>> applyCustomDimensions;
private final ObjLongConsumer<? super QueryMetrics<?>> reportMetric;
private final Consumer<QueryMetrics<?>> applyCustomDimensions;
private MetricsEmittingQueryRunner(
ServiceEmitter emitter,
QueryToolChest<?, ? super Query<T>> queryToolChest,
QueryToolChest<T, ? extends Query<T>> queryToolChest,
QueryRunner<T> queryRunner,
long creationTimeNs,
ObjLongConsumer<? super QueryMetrics<? super Query<T>>> reportMetric,
Consumer<QueryMetrics<? super Query<T>>> applyCustomDimensions
ObjLongConsumer<? super QueryMetrics<?>> reportMetric,
Consumer<QueryMetrics<?>> applyCustomDimensions
)
{
this.emitter = emitter;
@ -59,10 +59,10 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
public MetricsEmittingQueryRunner(
ServiceEmitter emitter,
QueryToolChest<?, ? super Query<T>> queryToolChest,
QueryToolChest<T, ? extends Query<T>> queryToolChest,
QueryRunner<T> queryRunner,
ObjLongConsumer<? super QueryMetrics<? super Query<T>>> reportMetric,
Consumer<QueryMetrics<? super Query<T>>> applyCustomDimensions
ObjLongConsumer<? super QueryMetrics<?>> reportMetric,
Consumer<QueryMetrics<?>> applyCustomDimensions
)
{
this(emitter, queryToolChest, queryRunner, -1, reportMetric, applyCustomDimensions);
@ -83,8 +83,8 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
@Override
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
{
QueryPlus<T> queryWithMetrics = queryPlus.withQueryMetrics((QueryToolChest<T, ? extends Query<T>>) queryToolChest);
final QueryMetrics<? super Query<T>> queryMetrics = (QueryMetrics<? super Query<T>>) queryWithMetrics.getQueryMetrics();
QueryPlus<T> queryWithMetrics = queryPlus.withQueryMetrics(queryToolChest);
final QueryMetrics<?> queryMetrics = queryWithMetrics.getQueryMetrics();
applyCustomDimensions.accept(queryMetrics);

View File

@ -66,6 +66,11 @@ public final class QueryPlus<T>
* Returns the same QueryPlus object, if it already has {@link QueryMetrics} ({@link #getQueryMetrics()} returns not
* null), or returns a new QueryPlus object with {@link Query} from this QueryPlus and QueryMetrics created using the
* given {@link QueryToolChest}, via {@link QueryToolChest#makeMetrics(Query)} method.
*
* By convention, callers of {@code withQueryMetrics()} must also call .getQueryMetrics().emit() on the returned
* QueryMetrics object, regardless if this object is the same as the object on which .withQueryMetrics() was initially
* called (i. e. it already had non-null QueryMetrics), or if it is a new QueryPlus object. See {@link
* MetricsEmittingQueryRunner} for example.
*/
public QueryPlus<T> withQueryMetrics(QueryToolChest<T, ? extends Query<T>> queryToolChest)
{
@ -76,6 +81,31 @@ public final class QueryPlus<T>
}
}
/**
* Returns a QueryPlus object without the components which are unsafe for concurrent use from multiple threads,
* therefore couldn't be passed down in concurrent or async {@link QueryRunner}s.
*
* Currently the only unsafe component is {@link QueryMetrics}, i. e. {@code withoutThreadUnsafeState()} call is
* equivalent to {@link #withoutQueryMetrics()}.
*/
public QueryPlus<T> withoutThreadUnsafeState()
{
return withoutQueryMetrics();
}
/**
* Returns the same QueryPlus object, if it doesn't have {@link QueryMetrics} ({@link #getQueryMetrics()} returns
* null), or returns a new QueryPlus object with {@link Query} from this QueryPlus and null as QueryMetrics.
*/
private QueryPlus<T> withoutQueryMetrics()
{
if (queryMetrics == null) {
return this;
} else {
return new QueryPlus<>(query, null);
}
}
/**
* Equivalent of withQuery(getQuery().withQuerySegmentSpec(spec)).
*/

View File

@ -43,19 +43,19 @@ public class DefaultGroupByQueryMetrics extends DefaultQueryMetrics<GroupByQuery
@Override
public void numDimensions(GroupByQuery query)
{
builder.setDimension("numDimensions", String.valueOf(query.getDimensions().size()));
setDimension("numDimensions", String.valueOf(query.getDimensions().size()));
}
@Override
public void numMetrics(GroupByQuery query)
{
builder.setDimension("numMetrics", String.valueOf(query.getAggregatorSpecs().size()));
setDimension("numMetrics", String.valueOf(query.getAggregatorSpecs().size()));
}
@Override
public void numComplexMetrics(GroupByQuery query)
{
int numComplexAggs = DruidMetrics.findNumComplexAggs(query.getAggregatorSpecs());
builder.setDimension("numComplexMetrics", String.valueOf(numComplexAggs));
setDimension("numComplexMetrics", String.valueOf(numComplexAggs));
}
}

View File

@ -119,9 +119,11 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION,
false
);
final QueryPlus<Row> queryPlusForRunners = queryPlus.withQuery(
query.withOverriddenContext(ImmutableMap.<String, Object>of(CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION, true))
);
final QueryPlus<Row> queryPlusForRunners = queryPlus
.withQuery(
query.withOverriddenContext(ImmutableMap.<String, Object>of(CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION, true))
)
.withoutThreadUnsafeState();
if (QueryContexts.isBySegment(query) || forceChainedExecution) {
ChainedExecutionQueryRunner<Row> runner = new ChainedExecutionQueryRunner<>(exec, queryWatcher, queryables);

View File

@ -205,14 +205,18 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
{
final Query<SegmentAnalysis> query = queryPlus.getQuery();
final int priority = QueryContexts.getPriority(query);
ListenableFuture<Sequence<SegmentAnalysis>> future = queryExecutor.submit(
final QueryPlus<SegmentAnalysis> threadSafeQueryPlus = queryPlus.withoutThreadUnsafeState();
final ListenableFuture<Sequence<SegmentAnalysis>> future = queryExecutor.submit(
new AbstractPrioritizedCallable<Sequence<SegmentAnalysis>>(priority)
{
@Override
public Sequence<SegmentAnalysis> call() throws Exception
{
return Sequences.simple(
Sequences.toList(input.run(queryPlus, responseContext), new ArrayList<>())
Sequences.toList(
input.run(threadSafeQueryPlus, responseContext),
new ArrayList<>()
)
);
}
}

View File

@ -42,13 +42,13 @@ public class DefaultTimeseriesQueryMetrics extends DefaultQueryMetrics<Timeserie
@Override
public void numMetrics(TimeseriesQuery query)
{
builder.setDimension("numMetrics", String.valueOf(query.getAggregatorSpecs().size()));
setDimension("numMetrics", String.valueOf(query.getAggregatorSpecs().size()));
}
@Override
public void numComplexMetrics(TimeseriesQuery query)
{
int numComplexAggs = DruidMetrics.findNumComplexAggs(query.getAggregatorSpecs());
builder.setDimension("numComplexMetrics", String.valueOf(numComplexAggs));
setDimension("numComplexMetrics", String.valueOf(numComplexAggs));
}
}

View File

@ -44,25 +44,25 @@ public class DefaultTopNQueryMetrics extends DefaultQueryMetrics<TopNQuery> impl
@Override
public void threshold(TopNQuery query)
{
builder.setDimension("threshold", String.valueOf(query.getThreshold()));
setDimension("threshold", String.valueOf(query.getThreshold()));
}
@Override
public void dimension(TopNQuery query)
{
builder.setDimension("dimension", query.getDimensionSpec().getDimension());
setDimension("dimension", query.getDimensionSpec().getDimension());
}
@Override
public void numMetrics(TopNQuery query)
{
builder.setDimension("numMetrics", String.valueOf(query.getAggregatorSpecs().size()));
setDimension("numMetrics", String.valueOf(query.getAggregatorSpecs().size()));
}
@Override
public void numComplexMetrics(TopNQuery query)
{
int numComplexAggs = DruidMetrics.findNumComplexAggs(query.getAggregatorSpecs());
builder.setDimension("numComplexMetrics", String.valueOf(numComplexAggs));
setDimension("numComplexMetrics", String.valueOf(numComplexAggs));
}
}

View File

@ -275,7 +275,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
*/
private <T> QueryRunner<T> withPerSinkMetrics(
final QueryRunner<T> sinkRunner,
final QueryToolChest<?, ? super Query<T>> queryToolChest,
final QueryToolChest<T, ? extends Query<T>> queryToolChest,
final String sinkSegmentIdentifier,
final AtomicLong cpuTimeAccumulator
)