Share query handling between Appenderator and RealtimePlumber. (#3248)

Fixes inconsistent metric handling between the two implementations. Formerly,
RealtimePlumber only emitted query/segmentAndCache/time and query/wait and
Appenderator only emitted query/partial/time and query/wait (all per sink).

Now they both do the same thing:
- query/segmentAndCache/time, query/segment/time are the time spent per sink.
- query/cpu/time is the CPU time spent per query.
- query/wait/time is the executor waiting time per sink.

These generally match historical metrics, except segmentAndCache & segment
mean the same thing here, because one Sink may be partially cached and
partially uncached and we aren't splitting that out.
This commit is contained in:
Gian Merlino 2016-07-19 20:15:13 -07:00 committed by Parag Jain
parent 50db86cb17
commit 06624c40c0
6 changed files with 382 additions and 377 deletions

View File

@ -34,6 +34,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.Granularity;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
@ -572,7 +573,7 @@ public class TaskLifecycleTest
}, // segment announcer
handoffNotifierFactory,
queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective
null, // query executor service
MoreExecutors.sameThreadExecutor(), // query executor service
monitorScheduler, // monitor scheduler
new SegmentLoaderFactory(
new SegmentLoaderLocalCacheManager(

View File

@ -36,8 +36,6 @@ import java.util.Map;
*/
public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
{
private static final String DEFAULT_METRIC_NAME = "query/partial/time";
private final ServiceEmitter emitter;
private final Function<Query<T>, ServiceMetricEvent.Builder> builderFn;
private final QueryRunner<T> queryRunner;
@ -45,16 +43,7 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
private final String metricName;
private final Map<String, String> userDimensions;
public MetricsEmittingQueryRunner(
ServiceEmitter emitter,
Function<Query<T>, ServiceMetricEvent.Builder> builderFn,
QueryRunner<T> queryRunner
)
{
this(emitter, builderFn, queryRunner, DEFAULT_METRIC_NAME, Maps.<String, String>newHashMap());
}
public MetricsEmittingQueryRunner(
private MetricsEmittingQueryRunner(
ServiceEmitter emitter,
Function<Query<T>, ServiceMetricEvent.Builder> builderFn,
QueryRunner<T> queryRunner,
@ -82,7 +71,6 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
this(emitter, builderFn, queryRunner, -1, metricName, userDimensions);
}
public MetricsEmittingQueryRunner<T> withWaitMeasuredFromNow()
{
return new MetricsEmittingQueryRunner<T>(

View File

@ -20,6 +20,7 @@
package io.druid.segment.realtime;
import com.google.common.base.Throwables;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.ReferenceCountingSegment;
@ -34,9 +35,10 @@ import java.io.IOException;
public class FireHydrant
{
private final int count;
private final Object swapLock = new Object();
private volatile IncrementalIndex index;
private volatile ReferenceCountingSegment adapter;
private Object swapLock = new Object();
public FireHydrant(
IncrementalIndex index,
@ -79,9 +81,17 @@ public class FireHydrant
return index == null;
}
public void swapSegment(Segment adapter)
public void swapSegment(Segment newAdapter)
{
synchronized (swapLock) {
if (adapter != null && newAdapter != null && !newAdapter.getIdentifier().equals(adapter.getIdentifier())) {
// Sanity check: identifier should not change
throw new ISE(
"WTF?! Cannot swap identifier[%s] -> [%s]!",
adapter.getIdentifier(),
newAdapter.getIdentifier()
);
}
if (this.adapter != null) {
try {
this.adapter.close();
@ -90,7 +100,7 @@ public class FireHydrant
throw Throwables.propagate(e);
}
}
this.adapter = new ReferenceCountingSegment(adapter);
this.adapter = new ReferenceCountingSegment(newAdapter);
this.index = null;
}
}

View File

@ -37,31 +37,19 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.CachingQueryRunner;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.concurrent.Execs;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.query.BySegmentQueryRunner;
import io.druid.query.MetricsEmittingQueryRunner;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QueryRunnerHelper;
import io.druid.query.QueryToolChest;
import io.druid.query.ReportTimelineMissingSegmentQueryRunner;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.SegmentDescriptor;
import io.druid.query.spec.SpecificSegmentQueryRunner;
import io.druid.query.spec.SpecificSegmentSpec;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexSpec;
@ -76,16 +64,12 @@ import io.druid.segment.realtime.FireHydrant;
import io.druid.segment.realtime.plumber.Sink;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.PartitionChunk;
import io.druid.timeline.partition.PartitionHolder;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
@ -109,26 +93,22 @@ public class AppenderatorImpl implements Appenderator
private static final EmittingLogger log = new EmittingLogger(AppenderatorImpl.class);
private static final int WARN_DELAY = 1000;
private static final String IDENTIFIER_FILE_NAME = "identifier.json";
private static final String CONTEXT_SKIP_INCREMENTAL_SEGMENT = "skipIncrementalSegment";
private final DataSchema schema;
private final AppenderatorConfig tuningConfig;
private final FireDepartmentMetrics metrics;
private final DataSegmentPusher dataSegmentPusher;
private final ObjectMapper objectMapper;
private final QueryRunnerFactoryConglomerate conglomerate;
private final DataSegmentAnnouncer segmentAnnouncer;
private final ServiceEmitter emitter;
private final ExecutorService queryExecutorService;
private final IndexIO indexIO;
private final IndexMerger indexMerger;
private final Cache cache;
private final CacheConfig cacheConfig;
private final Map<SegmentIdentifier, Sink> sinks = Maps.newConcurrentMap();
private final Set<SegmentIdentifier> droppingSinks = Sets.newConcurrentHashSet();
private final VersionedIntervalTimeline<String, Sink> sinkTimeline = new VersionedIntervalTimeline<>(
String.CASE_INSENSITIVE_ORDER
);
private final QuerySegmentWalker texasRanger;
private volatile ListeningExecutorService persistExecutor = null;
private volatile ListeningExecutorService mergeExecutor = null;
@ -157,30 +137,22 @@ public class AppenderatorImpl implements Appenderator
this.metrics = Preconditions.checkNotNull(metrics, "metrics");
this.dataSegmentPusher = Preconditions.checkNotNull(dataSegmentPusher, "dataSegmentPusher");
this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper");
this.conglomerate = conglomerate;
this.segmentAnnouncer = segmentAnnouncer;
this.emitter = emitter;
this.queryExecutorService = queryExecutorService;
this.indexIO = indexIO;
this.indexMerger = indexMerger;
this.cache = cache;
this.cacheConfig = cacheConfig;
this.segmentAnnouncer = Preconditions.checkNotNull(segmentAnnouncer, "segmentAnnouncer");
this.indexIO = Preconditions.checkNotNull(indexIO, "indexIO");
this.indexMerger = Preconditions.checkNotNull(indexMerger, "indexMerger");
this.cache = Preconditions.checkNotNull(cache, "cache");
this.texasRanger = conglomerate == null ? null : new SinkQuerySegmentWalker(
schema.getDataSource(),
sinkTimeline,
objectMapper,
emitter,
conglomerate,
queryExecutorService,
cache,
cacheConfig
);
if (conglomerate != null) {
// If we're not querying (no conglomerate) then it's ok for the other query stuff to be null.
// But otherwise, we need them all.
Preconditions.checkNotNull(segmentAnnouncer, "segmentAnnouncer");
Preconditions.checkNotNull(emitter, "emitter");
Preconditions.checkNotNull(queryExecutorService, "queryExecutorService");
Preconditions.checkNotNull(cache, "cache");
Preconditions.checkNotNull(cacheConfig, "cacheConfig");
if (!cache.isLocal()) {
log.error("Configured cache is not local, caching will not be enabled");
}
}
log.info("Creating appenderator for dataSource[%s]", schema.getDataSource());
log.info("Created Appenderator for dataSource[%s].", schema.getDataSource());
}
@Override
@ -289,176 +261,21 @@ public class AppenderatorImpl implements Appenderator
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, final Iterable<Interval> intervals)
{
if (conglomerate == null) {
if (texasRanger == null) {
throw new IllegalStateException("Don't query me, bro.");
}
final Iterable<SegmentDescriptor> specs = FunctionalIterable
.create(intervals)
.transformCat(
new Function<Interval, Iterable<TimelineObjectHolder<String, Sink>>>()
{
@Override
public Iterable<TimelineObjectHolder<String, Sink>> apply(final Interval interval)
{
return sinkTimeline.lookup(interval);
}
}
)
.transformCat(
new Function<TimelineObjectHolder<String, Sink>, Iterable<SegmentDescriptor>>()
{
@Override
public Iterable<SegmentDescriptor> apply(final TimelineObjectHolder<String, Sink> holder)
{
return FunctionalIterable
.create(holder.getObject())
.transform(
new Function<PartitionChunk<Sink>, SegmentDescriptor>()
{
@Override
public SegmentDescriptor apply(final PartitionChunk<Sink> chunk)
{
return new SegmentDescriptor(
holder.getInterval(),
holder.getVersion(),
chunk.getChunkNumber()
);
}
}
);
}
}
);
return getQueryRunnerForSegments(query, specs);
return texasRanger.getQueryRunnerForIntervals(query, intervals);
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final Iterable<SegmentDescriptor> specs)
{
if (conglomerate == null) {
if (texasRanger == null) {
throw new IllegalStateException("Don't query me, bro.");
}
// We only handle one dataSource. Make sure it's in the list of names, then ignore from here on out.
if (!query.getDataSource().getNames().contains(getDataSource())) {
log.makeAlert("Received query for unknown dataSource")
.addData("dataSource", query.getDataSource())
.emit();
return new NoopQueryRunner<>();
}
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
if (factory == null) {
throw new ISE("Unknown query type[%s].", query.getClass());
}
final QueryToolChest<T, Query<T>> toolchest = factory.getToolchest();
final Function<Query<T>, ServiceMetricEvent.Builder> builderFn =
new Function<Query<T>, ServiceMetricEvent.Builder>()
{
@Override
public ServiceMetricEvent.Builder apply(@Nullable Query<T> input)
{
return toolchest.makeMetricBuilder(query);
}
};
final boolean skipIncrementalSegment = query.getContextValue(CONTEXT_SKIP_INCREMENTAL_SEGMENT, false);
return toolchest.mergeResults(
factory.mergeRunners(
queryExecutorService,
FunctionalIterable
.create(specs)
.transform(
new Function<SegmentDescriptor, QueryRunner<T>>()
{
@Override
public QueryRunner<T> apply(final SegmentDescriptor descriptor)
{
final PartitionHolder<Sink> holder = sinkTimeline.findEntry(
descriptor.getInterval(),
descriptor.getVersion()
);
if (holder == null) {
return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
}
final PartitionChunk<Sink> chunk = holder.getChunk(descriptor.getPartitionNumber());
if (chunk == null) {
return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
}
final Sink theSink = chunk.getObject();
return new SpecificSegmentQueryRunner<>(
new MetricsEmittingQueryRunner<>(
emitter,
builderFn,
new BySegmentQueryRunner<T>(
theSink.getSegment().getIdentifier(),
descriptor.getInterval().getStart(),
factory.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Iterables.transform(
theSink,
new Function<FireHydrant, QueryRunner<T>>()
{
@Override
public QueryRunner<T> apply(final FireHydrant hydrant)
{
// Hydrant might swap at any point, but if it's swapped at the start
// then we know it's *definitely* swapped.
final boolean hydrantDefinitelySwapped = hydrant.hasSwapped();
if (skipIncrementalSegment && !hydrantDefinitelySwapped) {
return new NoopQueryRunner<>();
}
// Prevent the underlying segment from swapping when its being iterated
final Pair<Segment, Closeable> segment = hydrant.getAndIncrementSegment();
try {
QueryRunner<T> baseRunner = QueryRunnerHelper.makeClosingQueryRunner(
factory.createRunner(segment.lhs),
segment.rhs
);
if (hydrantDefinitelySwapped // only use caching if data is immutable
&& cache.isLocal() // hydrants may not be in sync between replicas, make sure cache is local
) {
return new CachingQueryRunner<>(
makeHydrantCacheIdentifier(hydrant, segment.lhs),
descriptor,
objectMapper,
cache,
toolchest,
baseRunner,
MoreExecutors.sameThreadExecutor(),
cacheConfig
);
} else {
return baseRunner;
}
}
catch (RuntimeException e) {
CloseQuietly.close(segment.rhs);
throw e;
}
}
}
)
)
)
).withWaitMeasuredFromNow(),
new SpecificSegmentSpec(descriptor)
);
}
}
)
)
);
return texasRanger.getQueryRunnerForSegments(query, specs);
}
@Override
@ -1067,6 +884,9 @@ public class AppenderatorImpl implements Appenderator
sink.getVersion(),
identifier.getShardSpec().createChunk(sink)
);
for (FireHydrant hydrant : sink) {
cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant));
}
if (removeOnDiskData) {
removeDirectory(computePersistDir(identifier));
@ -1184,9 +1004,4 @@ public class AppenderatorImpl implements Appenderator
}
}
}
private static String makeHydrantCacheIdentifier(FireHydrant input, Segment segment)
{
return segment.getIdentifier() + "_" + input.getCount();
}
}

View File

@ -0,0 +1,326 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.CachingQueryRunner;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.query.BySegmentQueryRunner;
import io.druid.query.CPUTimeMetricQueryRunner;
import io.druid.query.MetricsEmittingQueryRunner;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QueryRunnerHelper;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest;
import io.druid.query.ReportTimelineMissingSegmentQueryRunner;
import io.druid.query.SegmentDescriptor;
import io.druid.query.TableDataSource;
import io.druid.query.spec.SpecificSegmentQueryRunner;
import io.druid.query.spec.SpecificSegmentSpec;
import io.druid.segment.Segment;
import io.druid.segment.realtime.FireHydrant;
import io.druid.segment.realtime.plumber.Sink;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.PartitionChunk;
import io.druid.timeline.partition.PartitionHolder;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
public class SinkQuerySegmentWalker implements QuerySegmentWalker
{
private static final EmittingLogger log = new EmittingLogger(SinkQuerySegmentWalker.class);
private static final String CONTEXT_SKIP_INCREMENTAL_SEGMENT = "skipIncrementalSegment";
private final String dataSource;
private final VersionedIntervalTimeline<String, Sink> sinkTimeline;
private final ObjectMapper objectMapper;
private final ServiceEmitter emitter;
private final QueryRunnerFactoryConglomerate conglomerate;
private final ExecutorService queryExecutorService;
private final Cache cache;
private final CacheConfig cacheConfig;
public SinkQuerySegmentWalker(
String dataSource,
VersionedIntervalTimeline<String, Sink> sinkTimeline,
ObjectMapper objectMapper,
ServiceEmitter emitter,
QueryRunnerFactoryConglomerate conglomerate,
ExecutorService queryExecutorService,
Cache cache,
CacheConfig cacheConfig
)
{
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.sinkTimeline = Preconditions.checkNotNull(sinkTimeline, "sinkTimeline");
this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper");
this.emitter = Preconditions.checkNotNull(emitter, "emitter");
this.conglomerate = Preconditions.checkNotNull(conglomerate, "conglomerate");
this.queryExecutorService = Preconditions.checkNotNull(queryExecutorService, "queryExecutorService");
this.cache = Preconditions.checkNotNull(cache, "cache");
this.cacheConfig = Preconditions.checkNotNull(cacheConfig, "cacheConfig");
if (!cache.isLocal()) {
log.warn("Configured cache[%s] is not local, caching will not be enabled.", cache.getClass().getName());
}
}
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, final Iterable<Interval> intervals)
{
final Iterable<SegmentDescriptor> specs = FunctionalIterable
.create(intervals)
.transformCat(
new Function<Interval, Iterable<TimelineObjectHolder<String, Sink>>>()
{
@Override
public Iterable<TimelineObjectHolder<String, Sink>> apply(final Interval interval)
{
return sinkTimeline.lookup(interval);
}
}
)
.transformCat(
new Function<TimelineObjectHolder<String, Sink>, Iterable<SegmentDescriptor>>()
{
@Override
public Iterable<SegmentDescriptor> apply(final TimelineObjectHolder<String, Sink> holder)
{
return FunctionalIterable
.create(holder.getObject())
.transform(
new Function<PartitionChunk<Sink>, SegmentDescriptor>()
{
@Override
public SegmentDescriptor apply(final PartitionChunk<Sink> chunk)
{
return new SegmentDescriptor(
holder.getInterval(),
holder.getVersion(),
chunk.getChunkNumber()
);
}
}
);
}
}
);
return getQueryRunnerForSegments(query, specs);
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final Iterable<SegmentDescriptor> specs)
{
// We only handle one particular dataSource. Make sure that's what we have, then ignore from here on out.
if (!(query.getDataSource() instanceof TableDataSource)
|| !dataSource.equals(((TableDataSource) query.getDataSource()).getName())) {
log.makeAlert("Received query for unknown dataSource")
.addData("dataSource", query.getDataSource())
.emit();
return new NoopQueryRunner<>();
}
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
if (factory == null) {
throw new ISE("Unknown query type[%s].", query.getClass());
}
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
final Function<Query<T>, ServiceMetricEvent.Builder> builderFn =
new Function<Query<T>, ServiceMetricEvent.Builder>()
{
@Override
public ServiceMetricEvent.Builder apply(@Nullable Query<T> input)
{
return toolChest.makeMetricBuilder(query);
}
};
final boolean skipIncrementalSegment = query.getContextValue(CONTEXT_SKIP_INCREMENTAL_SEGMENT, false);
final AtomicLong cpuTimeAccumulator = new AtomicLong(0L);
return CPUTimeMetricQueryRunner.safeBuild(
toolChest.mergeResults(
factory.mergeRunners(
queryExecutorService,
FunctionalIterable
.create(specs)
.transform(
new Function<SegmentDescriptor, QueryRunner<T>>()
{
@Override
public QueryRunner<T> apply(final SegmentDescriptor descriptor)
{
final PartitionHolder<Sink> holder = sinkTimeline.findEntry(
descriptor.getInterval(),
descriptor.getVersion()
);
if (holder == null) {
return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
}
final PartitionChunk<Sink> chunk = holder.getChunk(descriptor.getPartitionNumber());
if (chunk == null) {
return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
}
final Sink theSink = chunk.getObject();
final String sinkSegmentIdentifier = theSink.getSegment().getIdentifier();
return new SpecificSegmentQueryRunner<>(
withPerSinkMetrics(
new BySegmentQueryRunner<>(
sinkSegmentIdentifier,
descriptor.getInterval().getStart(),
factory.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Iterables.transform(
theSink,
new Function<FireHydrant, QueryRunner<T>>()
{
@Override
public QueryRunner<T> apply(final FireHydrant hydrant)
{
// Hydrant might swap at any point, but if it's swapped at the start
// then we know it's *definitely* swapped.
final boolean hydrantDefinitelySwapped = hydrant.hasSwapped();
if (skipIncrementalSegment && !hydrantDefinitelySwapped) {
return new NoopQueryRunner<>();
}
// Prevent the underlying segment from swapping when its being iterated
final Pair<Segment, Closeable> segment = hydrant.getAndIncrementSegment();
try {
QueryRunner<T> baseRunner = QueryRunnerHelper.makeClosingQueryRunner(
factory.createRunner(segment.lhs),
segment.rhs
);
// 1) Only use caching if data is immutable
// 2) Hydrants are not the same between replicas, make sure cache is local
if (hydrantDefinitelySwapped && cache.isLocal()) {
return new CachingQueryRunner<>(
makeHydrantCacheIdentifier(hydrant),
descriptor,
objectMapper,
cache,
toolChest,
baseRunner,
MoreExecutors.sameThreadExecutor(),
cacheConfig
);
} else {
return baseRunner;
}
}
catch (RuntimeException e) {
CloseQuietly.close(segment.rhs);
throw e;
}
}
}
)
)
),
builderFn,
sinkSegmentIdentifier,
cpuTimeAccumulator
),
new SpecificSegmentSpec(descriptor)
);
}
}
)
)
),
builderFn,
emitter,
cpuTimeAccumulator,
true
);
}
/**
* Decorates a Sink's query runner to emit query/segmentAndCache/time, query/segment/time, query/wait/time once
* each for the whole Sink. Also adds CPU time to cpuTimeAccumulator.
*/
private <T> QueryRunner<T> withPerSinkMetrics(
final QueryRunner<T> sinkRunner,
final Function<Query<T>, ServiceMetricEvent.Builder> builderFn,
final String sinkSegmentIdentifier,
final AtomicLong cpuTimeAccumulator
)
{
final ImmutableMap<String, String> dims = ImmutableMap.of("segment", sinkSegmentIdentifier);
// Note: query/segmentAndCache/time and query/segment/time are effectively the same here. They don't split apart
// cache vs. non-cache due to the fact that Sinks may be partially cached and partially uncached. Making this
// better would need to involve another accumulator like the cpuTimeAccumulator that we could share with the
// sinkRunner.
return CPUTimeMetricQueryRunner.safeBuild(
new MetricsEmittingQueryRunner<>(
emitter,
builderFn,
new MetricsEmittingQueryRunner<>(
emitter,
builderFn,
sinkRunner,
"query/segment/time",
dims
),
"query/segmentAndCache/time",
dims
).withWaitMeasuredFromNow(),
builderFn,
emitter,
cpuTimeAccumulator,
false
);
}
public static String makeHydrantCacheIdentifier(FireHydrant input)
{
return input.getSegment().getIdentifier() + "_" + input.getCount();
}
}

View File

@ -31,17 +31,12 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.Granularity;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.CachingQueryRunner;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.common.guava.ThreadRenamingCallable;
@ -51,17 +46,11 @@ import io.druid.concurrent.Execs;
import io.druid.concurrent.TaskThreadPriority;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.query.MetricsEmittingQueryRunner;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QueryRunnerHelper;
import io.druid.query.QueryToolChest;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.SegmentDescriptor;
import io.druid.query.spec.SpecificSegmentQueryRunner;
import io.druid.query.spec.SpecificSegmentSpec;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexSpec;
@ -76,9 +65,9 @@ import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.FireHydrant;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.segment.realtime.appenderator.SinkQuerySegmentWalker;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.SingleElementPartitionChunk;
import org.apache.commons.io.FileUtils;
@ -87,8 +76,6 @@ import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.Period;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
@ -111,10 +98,7 @@ public class RealtimePlumber implements Plumber
private final RealtimeTuningConfig config;
private final RejectionPolicy rejectionPolicy;
private final FireDepartmentMetrics metrics;
private final ServiceEmitter emitter;
private final QueryRunnerFactoryConglomerate conglomerate;
private final DataSegmentAnnouncer segmentAnnouncer;
private final ExecutorService queryExecutorService;
private final DataSegmentPusher dataSegmentPusher;
private final SegmentPublisher segmentPublisher;
private final SegmentHandoffNotifier handoffNotifier;
@ -123,10 +107,9 @@ public class RealtimePlumber implements Plumber
private final VersionedIntervalTimeline<String, Sink> sinkTimeline = new VersionedIntervalTimeline<String, Sink>(
String.CASE_INSENSITIVE_ORDER
);
private final QuerySegmentWalker texasRanger;
private final Cache cache;
private final CacheConfig cacheConfig;
private final ObjectMapper objectMapper;
private volatile long nextFlush = 0;
private volatile boolean shuttingDown = false;
@ -140,8 +123,6 @@ public class RealtimePlumber implements Plumber
private static final String COMMIT_METADATA_KEY = "%commitMetadata%";
private static final String COMMIT_METADATA_TIMESTAMP_KEY = "%commitMetadataTimestamp%";
private static final String SKIP_INCREMENTAL_SEGMENT = "skipIncrementalSegment";
public RealtimePlumber(
DataSchema schema,
@ -165,22 +146,23 @@ public class RealtimePlumber implements Plumber
this.config = config;
this.rejectionPolicy = config.getRejectionPolicyFactory().create(config.getWindowPeriod());
this.metrics = metrics;
this.emitter = emitter;
this.conglomerate = conglomerate;
this.segmentAnnouncer = segmentAnnouncer;
this.queryExecutorService = queryExecutorService;
this.dataSegmentPusher = dataSegmentPusher;
this.segmentPublisher = segmentPublisher;
this.handoffNotifier = handoffNotifier;
this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger");
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
this.cache = cache;
this.cacheConfig = cacheConfig;
this.objectMapper = objectMapper;
if (!cache.isLocal()) {
log.error("Configured cache is not local, caching will not be enabled");
}
this.texasRanger = new SinkQuerySegmentWalker(
schema.getDataSource(),
sinkTimeline,
objectMapper,
emitter,
conglomerate,
queryExecutorService,
cache,
cacheConfig
);
log.info("Creating plumber using rejectionPolicy[%s]", getRejectionPolicy());
}
@ -273,125 +255,8 @@ public class RealtimePlumber implements Plumber
@Override
public <T> QueryRunner<T> getQueryRunner(final Query<T> query)
{
final boolean skipIncrementalSegment = query.getContextBoolean(SKIP_INCREMENTAL_SEGMENT, false);
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
final QueryToolChest<T, Query<T>> toolchest = factory.getToolchest();
final Function<Query<T>, ServiceMetricEvent.Builder> builderFn =
new Function<Query<T>, ServiceMetricEvent.Builder>()
{
@Override
public ServiceMetricEvent.Builder apply(@Nullable Query<T> input)
{
return toolchest.makeMetricBuilder(query);
}
};
List<TimelineObjectHolder<String, Sink>> querySinks = Lists.newArrayList();
for (Interval interval : query.getIntervals()) {
querySinks.addAll(sinkTimeline.lookup(interval));
}
return toolchest.mergeResults(
factory.mergeRunners(
queryExecutorService,
FunctionalIterable
.create(querySinks)
.transform(
new Function<TimelineObjectHolder<String, Sink>, QueryRunner<T>>()
{
@Override
public QueryRunner<T> apply(TimelineObjectHolder<String, Sink> holder)
{
if (holder == null) {
throw new ISE("No timeline entry at all!");
}
// The realtime plumber always uses SingleElementPartitionChunk
final Sink theSink = holder.getObject().getChunk(0).getObject();
if (theSink == null) {
throw new ISE("Missing sink for timeline entry[%s]!", holder);
}
final SegmentDescriptor descriptor = new SegmentDescriptor(
holder.getInterval(),
theSink.getSegment().getVersion(),
theSink.getSegment().getShardSpec().getPartitionNum()
);
return new SpecificSegmentQueryRunner<T>(
new MetricsEmittingQueryRunner<T>(
emitter,
builderFn,
factory.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Iterables.transform(
theSink,
new Function<FireHydrant, QueryRunner<T>>()
{
@Override
public QueryRunner<T> apply(FireHydrant input)
{
// Hydrant might swap at any point, but if it's swapped at the start
// then we know it's *definitely* swapped.
final boolean hydrantDefinitelySwapped = input.hasSwapped();
if (skipIncrementalSegment && !hydrantDefinitelySwapped) {
return new NoopQueryRunner<T>();
}
// Prevent the underlying segment from swapping when its being iterated
final Pair<Segment, Closeable> segment = input.getAndIncrementSegment();
try {
QueryRunner<T> baseRunner = QueryRunnerHelper.makeClosingQueryRunner(
factory.createRunner(segment.lhs),
segment.rhs
);
if (hydrantDefinitelySwapped // only use caching if data is immutable
&& cache.isLocal() // hydrants may not be in sync between replicas, make sure cache is local
) {
return new CachingQueryRunner<>(
makeHydrantIdentifier(input, segment.lhs),
descriptor,
objectMapper,
cache,
toolchest,
baseRunner,
MoreExecutors.sameThreadExecutor(),
cacheConfig
);
} else {
return baseRunner;
}
}
catch (RuntimeException e) {
CloseQuietly.close(segment.rhs);
throw e;
}
}
}
)
),
"query/segmentAndCache/time",
ImmutableMap.of("segment", theSink.getSegment().getIdentifier())
).withWaitMeasuredFromNow(),
new SpecificSegmentSpec(
descriptor
)
);
}
}
)
)
);
}
protected static String makeHydrantIdentifier(FireHydrant input, Segment segment)
{
return segment.getIdentifier() + "_" + input.getCount();
// Calling getQueryRunnerForIntervals here works because there's only one segment per interval for RealtimePlumber.
return texasRanger.getQueryRunnerForIntervals(query, query.getIntervals());
}
@Override
@ -990,7 +855,7 @@ public class RealtimePlumber implements Plumber
new SingleElementPartitionChunk<>(sink)
);
for (FireHydrant hydrant : sink) {
cache.close(makeHydrantIdentifier(hydrant, hydrant.getSegment()));
cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant));
}
synchronized (handoffCondition) {
handoffCondition.notifyAll();