diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index d17d471edeb..d85e586d8af 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -34,6 +34,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; +import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.Granularity; import com.metamx.common.ISE; import com.metamx.common.Pair; @@ -572,7 +573,7 @@ public class TaskLifecycleTest }, // segment announcer handoffNotifierFactory, queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective - null, // query executor service + MoreExecutors.sameThreadExecutor(), // query executor service monitorScheduler, // monitor scheduler new SegmentLoaderFactory( new SegmentLoaderLocalCacheManager( diff --git a/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java b/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java index b1211e0d7e5..a7633705d36 100644 --- a/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java @@ -36,8 +36,6 @@ import java.util.Map; */ public class MetricsEmittingQueryRunner implements QueryRunner { - private static final String DEFAULT_METRIC_NAME = "query/partial/time"; - private final ServiceEmitter emitter; private final Function, ServiceMetricEvent.Builder> builderFn; private final QueryRunner queryRunner; @@ -45,16 +43,7 @@ public class MetricsEmittingQueryRunner implements QueryRunner private final String metricName; private final Map userDimensions; - public MetricsEmittingQueryRunner( - ServiceEmitter emitter, - Function, ServiceMetricEvent.Builder> builderFn, - QueryRunner queryRunner - ) - { - this(emitter, builderFn, queryRunner, DEFAULT_METRIC_NAME, Maps.newHashMap()); - } - - public MetricsEmittingQueryRunner( + private MetricsEmittingQueryRunner( ServiceEmitter emitter, Function, ServiceMetricEvent.Builder> builderFn, QueryRunner queryRunner, @@ -82,7 +71,6 @@ public class MetricsEmittingQueryRunner implements QueryRunner this(emitter, builderFn, queryRunner, -1, metricName, userDimensions); } - public MetricsEmittingQueryRunner withWaitMeasuredFromNow() { return new MetricsEmittingQueryRunner( diff --git a/server/src/main/java/io/druid/segment/realtime/FireHydrant.java b/server/src/main/java/io/druid/segment/realtime/FireHydrant.java index 4781520968c..ebc9d0e68fa 100644 --- a/server/src/main/java/io/druid/segment/realtime/FireHydrant.java +++ b/server/src/main/java/io/druid/segment/realtime/FireHydrant.java @@ -20,6 +20,7 @@ package io.druid.segment.realtime; import com.google.common.base.Throwables; +import com.metamx.common.ISE; import com.metamx.common.Pair; import io.druid.segment.IncrementalIndexSegment; import io.druid.segment.ReferenceCountingSegment; @@ -34,9 +35,10 @@ import java.io.IOException; public class FireHydrant { private final int count; + private final Object swapLock = new Object(); + private volatile IncrementalIndex index; private volatile ReferenceCountingSegment adapter; - private Object swapLock = new Object(); public FireHydrant( IncrementalIndex index, @@ -79,9 +81,17 @@ public class FireHydrant return index == null; } - public void swapSegment(Segment adapter) + public void swapSegment(Segment newAdapter) { synchronized (swapLock) { + if (adapter != null && newAdapter != null && !newAdapter.getIdentifier().equals(adapter.getIdentifier())) { + // Sanity check: identifier should not change + throw new ISE( + "WTF?! Cannot swap identifier[%s] -> [%s]!", + adapter.getIdentifier(), + newAdapter.getIdentifier() + ); + } if (this.adapter != null) { try { this.adapter.close(); @@ -90,7 +100,7 @@ public class FireHydrant throw Throwables.propagate(e); } } - this.adapter = new ReferenceCountingSegment(adapter); + this.adapter = new ReferenceCountingSegment(newAdapter); this.index = null; } } diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index 51c14a7d45f..9d013ea7c50 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -37,31 +37,19 @@ import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.common.Pair; -import com.metamx.common.guava.CloseQuietly; -import com.metamx.common.guava.FunctionalIterable; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; -import com.metamx.emitter.service.ServiceMetricEvent; -import io.druid.client.CachingQueryRunner; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; import io.druid.common.guava.ThreadRenamingCallable; import io.druid.concurrent.Execs; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; -import io.druid.query.BySegmentQueryRunner; -import io.druid.query.MetricsEmittingQueryRunner; -import io.druid.query.NoopQueryRunner; import io.druid.query.Query; import io.druid.query.QueryRunner; -import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerFactoryConglomerate; -import io.druid.query.QueryRunnerHelper; -import io.druid.query.QueryToolChest; -import io.druid.query.ReportTimelineMissingSegmentQueryRunner; +import io.druid.query.QuerySegmentWalker; import io.druid.query.SegmentDescriptor; -import io.druid.query.spec.SpecificSegmentQueryRunner; -import io.druid.query.spec.SpecificSegmentSpec; import io.druid.segment.IndexIO; import io.druid.segment.IndexMerger; import io.druid.segment.IndexSpec; @@ -76,16 +64,12 @@ import io.druid.segment.realtime.FireHydrant; import io.druid.segment.realtime.plumber.Sink; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; -import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; -import io.druid.timeline.partition.PartitionChunk; -import io.druid.timeline.partition.PartitionHolder; import org.apache.commons.io.FileUtils; import org.joda.time.DateTime; import org.joda.time.Interval; import javax.annotation.Nullable; -import java.io.Closeable; import java.io.File; import java.io.FilenameFilter; import java.io.IOException; @@ -109,26 +93,22 @@ public class AppenderatorImpl implements Appenderator private static final EmittingLogger log = new EmittingLogger(AppenderatorImpl.class); private static final int WARN_DELAY = 1000; private static final String IDENTIFIER_FILE_NAME = "identifier.json"; - private static final String CONTEXT_SKIP_INCREMENTAL_SEGMENT = "skipIncrementalSegment"; private final DataSchema schema; private final AppenderatorConfig tuningConfig; private final FireDepartmentMetrics metrics; private final DataSegmentPusher dataSegmentPusher; private final ObjectMapper objectMapper; - private final QueryRunnerFactoryConglomerate conglomerate; private final DataSegmentAnnouncer segmentAnnouncer; - private final ServiceEmitter emitter; - private final ExecutorService queryExecutorService; private final IndexIO indexIO; private final IndexMerger indexMerger; private final Cache cache; - private final CacheConfig cacheConfig; private final Map sinks = Maps.newConcurrentMap(); private final Set droppingSinks = Sets.newConcurrentHashSet(); private final VersionedIntervalTimeline sinkTimeline = new VersionedIntervalTimeline<>( String.CASE_INSENSITIVE_ORDER ); + private final QuerySegmentWalker texasRanger; private volatile ListeningExecutorService persistExecutor = null; private volatile ListeningExecutorService mergeExecutor = null; @@ -157,30 +137,22 @@ public class AppenderatorImpl implements Appenderator this.metrics = Preconditions.checkNotNull(metrics, "metrics"); this.dataSegmentPusher = Preconditions.checkNotNull(dataSegmentPusher, "dataSegmentPusher"); this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper"); - this.conglomerate = conglomerate; - this.segmentAnnouncer = segmentAnnouncer; - this.emitter = emitter; - this.queryExecutorService = queryExecutorService; - this.indexIO = indexIO; - this.indexMerger = indexMerger; - this.cache = cache; - this.cacheConfig = cacheConfig; + this.segmentAnnouncer = Preconditions.checkNotNull(segmentAnnouncer, "segmentAnnouncer"); + this.indexIO = Preconditions.checkNotNull(indexIO, "indexIO"); + this.indexMerger = Preconditions.checkNotNull(indexMerger, "indexMerger"); + this.cache = Preconditions.checkNotNull(cache, "cache"); + this.texasRanger = conglomerate == null ? null : new SinkQuerySegmentWalker( + schema.getDataSource(), + sinkTimeline, + objectMapper, + emitter, + conglomerate, + queryExecutorService, + cache, + cacheConfig + ); - if (conglomerate != null) { - // If we're not querying (no conglomerate) then it's ok for the other query stuff to be null. - // But otherwise, we need them all. - Preconditions.checkNotNull(segmentAnnouncer, "segmentAnnouncer"); - Preconditions.checkNotNull(emitter, "emitter"); - Preconditions.checkNotNull(queryExecutorService, "queryExecutorService"); - Preconditions.checkNotNull(cache, "cache"); - Preconditions.checkNotNull(cacheConfig, "cacheConfig"); - - if (!cache.isLocal()) { - log.error("Configured cache is not local, caching will not be enabled"); - } - } - - log.info("Creating appenderator for dataSource[%s]", schema.getDataSource()); + log.info("Created Appenderator for dataSource[%s].", schema.getDataSource()); } @Override @@ -289,176 +261,21 @@ public class AppenderatorImpl implements Appenderator @Override public QueryRunner getQueryRunnerForIntervals(final Query query, final Iterable intervals) { - if (conglomerate == null) { + if (texasRanger == null) { throw new IllegalStateException("Don't query me, bro."); } - final Iterable specs = FunctionalIterable - .create(intervals) - .transformCat( - new Function>>() - { - @Override - public Iterable> apply(final Interval interval) - { - return sinkTimeline.lookup(interval); - } - } - ) - .transformCat( - new Function, Iterable>() - { - @Override - public Iterable apply(final TimelineObjectHolder holder) - { - return FunctionalIterable - .create(holder.getObject()) - .transform( - new Function, SegmentDescriptor>() - { - @Override - public SegmentDescriptor apply(final PartitionChunk chunk) - { - return new SegmentDescriptor( - holder.getInterval(), - holder.getVersion(), - chunk.getChunkNumber() - ); - } - } - ); - } - } - ); - - return getQueryRunnerForSegments(query, specs); + return texasRanger.getQueryRunnerForIntervals(query, intervals); } @Override public QueryRunner getQueryRunnerForSegments(final Query query, final Iterable specs) { - if (conglomerate == null) { + if (texasRanger == null) { throw new IllegalStateException("Don't query me, bro."); } - // We only handle one dataSource. Make sure it's in the list of names, then ignore from here on out. - if (!query.getDataSource().getNames().contains(getDataSource())) { - log.makeAlert("Received query for unknown dataSource") - .addData("dataSource", query.getDataSource()) - .emit(); - return new NoopQueryRunner<>(); - } - - final QueryRunnerFactory> factory = conglomerate.findFactory(query); - if (factory == null) { - throw new ISE("Unknown query type[%s].", query.getClass()); - } - - final QueryToolChest> toolchest = factory.getToolchest(); - final Function, ServiceMetricEvent.Builder> builderFn = - new Function, ServiceMetricEvent.Builder>() - { - - @Override - public ServiceMetricEvent.Builder apply(@Nullable Query input) - { - return toolchest.makeMetricBuilder(query); - } - }; - final boolean skipIncrementalSegment = query.getContextValue(CONTEXT_SKIP_INCREMENTAL_SEGMENT, false); - - return toolchest.mergeResults( - factory.mergeRunners( - queryExecutorService, - FunctionalIterable - .create(specs) - .transform( - new Function>() - { - @Override - public QueryRunner apply(final SegmentDescriptor descriptor) - { - final PartitionHolder holder = sinkTimeline.findEntry( - descriptor.getInterval(), - descriptor.getVersion() - ); - if (holder == null) { - return new ReportTimelineMissingSegmentQueryRunner<>(descriptor); - } - - final PartitionChunk chunk = holder.getChunk(descriptor.getPartitionNumber()); - if (chunk == null) { - return new ReportTimelineMissingSegmentQueryRunner<>(descriptor); - } - - final Sink theSink = chunk.getObject(); - - return new SpecificSegmentQueryRunner<>( - new MetricsEmittingQueryRunner<>( - emitter, - builderFn, - new BySegmentQueryRunner( - theSink.getSegment().getIdentifier(), - descriptor.getInterval().getStart(), - factory.mergeRunners( - MoreExecutors.sameThreadExecutor(), - Iterables.transform( - theSink, - new Function>() - { - @Override - public QueryRunner apply(final FireHydrant hydrant) - { - // Hydrant might swap at any point, but if it's swapped at the start - // then we know it's *definitely* swapped. - final boolean hydrantDefinitelySwapped = hydrant.hasSwapped(); - - if (skipIncrementalSegment && !hydrantDefinitelySwapped) { - return new NoopQueryRunner<>(); - } - - // Prevent the underlying segment from swapping when its being iterated - final Pair segment = hydrant.getAndIncrementSegment(); - try { - QueryRunner baseRunner = QueryRunnerHelper.makeClosingQueryRunner( - factory.createRunner(segment.lhs), - segment.rhs - ); - - if (hydrantDefinitelySwapped // only use caching if data is immutable - && cache.isLocal() // hydrants may not be in sync between replicas, make sure cache is local - ) { - return new CachingQueryRunner<>( - makeHydrantCacheIdentifier(hydrant, segment.lhs), - descriptor, - objectMapper, - cache, - toolchest, - baseRunner, - MoreExecutors.sameThreadExecutor(), - cacheConfig - ); - } else { - return baseRunner; - } - } - catch (RuntimeException e) { - CloseQuietly.close(segment.rhs); - throw e; - } - } - } - ) - ) - ) - ).withWaitMeasuredFromNow(), - new SpecificSegmentSpec(descriptor) - ); - } - } - ) - ) - ); + return texasRanger.getQueryRunnerForSegments(query, specs); } @Override @@ -1067,6 +884,9 @@ public class AppenderatorImpl implements Appenderator sink.getVersion(), identifier.getShardSpec().createChunk(sink) ); + for (FireHydrant hydrant : sink) { + cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant)); + } if (removeOnDiskData) { removeDirectory(computePersistDir(identifier)); @@ -1184,9 +1004,4 @@ public class AppenderatorImpl implements Appenderator } } } - - private static String makeHydrantCacheIdentifier(FireHydrant input, Segment segment) - { - return segment.getIdentifier() + "_" + input.getCount(); - } } diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/io/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java new file mode 100644 index 00000000000..e5f3aac4f41 --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -0,0 +1,326 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.appenderator; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.MoreExecutors; +import com.metamx.common.ISE; +import com.metamx.common.Pair; +import com.metamx.common.guava.CloseQuietly; +import com.metamx.common.guava.FunctionalIterable; +import com.metamx.emitter.EmittingLogger; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.emitter.service.ServiceMetricEvent; +import io.druid.client.CachingQueryRunner; +import io.druid.client.cache.Cache; +import io.druid.client.cache.CacheConfig; +import io.druid.query.BySegmentQueryRunner; +import io.druid.query.CPUTimeMetricQueryRunner; +import io.druid.query.MetricsEmittingQueryRunner; +import io.druid.query.NoopQueryRunner; +import io.druid.query.Query; +import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerFactory; +import io.druid.query.QueryRunnerFactoryConglomerate; +import io.druid.query.QueryRunnerHelper; +import io.druid.query.QuerySegmentWalker; +import io.druid.query.QueryToolChest; +import io.druid.query.ReportTimelineMissingSegmentQueryRunner; +import io.druid.query.SegmentDescriptor; +import io.druid.query.TableDataSource; +import io.druid.query.spec.SpecificSegmentQueryRunner; +import io.druid.query.spec.SpecificSegmentSpec; +import io.druid.segment.Segment; +import io.druid.segment.realtime.FireHydrant; +import io.druid.segment.realtime.plumber.Sink; +import io.druid.timeline.TimelineObjectHolder; +import io.druid.timeline.VersionedIntervalTimeline; +import io.druid.timeline.partition.PartitionChunk; +import io.druid.timeline.partition.PartitionHolder; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicLong; + +public class SinkQuerySegmentWalker implements QuerySegmentWalker +{ + private static final EmittingLogger log = new EmittingLogger(SinkQuerySegmentWalker.class); + private static final String CONTEXT_SKIP_INCREMENTAL_SEGMENT = "skipIncrementalSegment"; + + private final String dataSource; + private final VersionedIntervalTimeline sinkTimeline; + private final ObjectMapper objectMapper; + private final ServiceEmitter emitter; + private final QueryRunnerFactoryConglomerate conglomerate; + private final ExecutorService queryExecutorService; + private final Cache cache; + private final CacheConfig cacheConfig; + + public SinkQuerySegmentWalker( + String dataSource, + VersionedIntervalTimeline sinkTimeline, + ObjectMapper objectMapper, + ServiceEmitter emitter, + QueryRunnerFactoryConglomerate conglomerate, + ExecutorService queryExecutorService, + Cache cache, + CacheConfig cacheConfig + ) + { + this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); + this.sinkTimeline = Preconditions.checkNotNull(sinkTimeline, "sinkTimeline"); + this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper"); + this.emitter = Preconditions.checkNotNull(emitter, "emitter"); + this.conglomerate = Preconditions.checkNotNull(conglomerate, "conglomerate"); + this.queryExecutorService = Preconditions.checkNotNull(queryExecutorService, "queryExecutorService"); + this.cache = Preconditions.checkNotNull(cache, "cache"); + this.cacheConfig = Preconditions.checkNotNull(cacheConfig, "cacheConfig"); + + if (!cache.isLocal()) { + log.warn("Configured cache[%s] is not local, caching will not be enabled.", cache.getClass().getName()); + } + } + + @Override + public QueryRunner getQueryRunnerForIntervals(final Query query, final Iterable intervals) + { + final Iterable specs = FunctionalIterable + .create(intervals) + .transformCat( + new Function>>() + { + @Override + public Iterable> apply(final Interval interval) + { + return sinkTimeline.lookup(interval); + } + } + ) + .transformCat( + new Function, Iterable>() + { + @Override + public Iterable apply(final TimelineObjectHolder holder) + { + return FunctionalIterable + .create(holder.getObject()) + .transform( + new Function, SegmentDescriptor>() + { + @Override + public SegmentDescriptor apply(final PartitionChunk chunk) + { + return new SegmentDescriptor( + holder.getInterval(), + holder.getVersion(), + chunk.getChunkNumber() + ); + } + } + ); + } + } + ); + + return getQueryRunnerForSegments(query, specs); + } + + @Override + public QueryRunner getQueryRunnerForSegments(final Query query, final Iterable specs) + { + // We only handle one particular dataSource. Make sure that's what we have, then ignore from here on out. + if (!(query.getDataSource() instanceof TableDataSource) + || !dataSource.equals(((TableDataSource) query.getDataSource()).getName())) { + log.makeAlert("Received query for unknown dataSource") + .addData("dataSource", query.getDataSource()) + .emit(); + return new NoopQueryRunner<>(); + } + + final QueryRunnerFactory> factory = conglomerate.findFactory(query); + if (factory == null) { + throw new ISE("Unknown query type[%s].", query.getClass()); + } + + final QueryToolChest> toolChest = factory.getToolchest(); + final Function, ServiceMetricEvent.Builder> builderFn = + new Function, ServiceMetricEvent.Builder>() + { + @Override + public ServiceMetricEvent.Builder apply(@Nullable Query input) + { + return toolChest.makeMetricBuilder(query); + } + }; + final boolean skipIncrementalSegment = query.getContextValue(CONTEXT_SKIP_INCREMENTAL_SEGMENT, false); + final AtomicLong cpuTimeAccumulator = new AtomicLong(0L); + + return CPUTimeMetricQueryRunner.safeBuild( + toolChest.mergeResults( + factory.mergeRunners( + queryExecutorService, + FunctionalIterable + .create(specs) + .transform( + new Function>() + { + @Override + public QueryRunner apply(final SegmentDescriptor descriptor) + { + final PartitionHolder holder = sinkTimeline.findEntry( + descriptor.getInterval(), + descriptor.getVersion() + ); + if (holder == null) { + return new ReportTimelineMissingSegmentQueryRunner<>(descriptor); + } + + final PartitionChunk chunk = holder.getChunk(descriptor.getPartitionNumber()); + if (chunk == null) { + return new ReportTimelineMissingSegmentQueryRunner<>(descriptor); + } + + final Sink theSink = chunk.getObject(); + final String sinkSegmentIdentifier = theSink.getSegment().getIdentifier(); + + return new SpecificSegmentQueryRunner<>( + withPerSinkMetrics( + new BySegmentQueryRunner<>( + sinkSegmentIdentifier, + descriptor.getInterval().getStart(), + factory.mergeRunners( + MoreExecutors.sameThreadExecutor(), + Iterables.transform( + theSink, + new Function>() + { + @Override + public QueryRunner apply(final FireHydrant hydrant) + { + // Hydrant might swap at any point, but if it's swapped at the start + // then we know it's *definitely* swapped. + final boolean hydrantDefinitelySwapped = hydrant.hasSwapped(); + + if (skipIncrementalSegment && !hydrantDefinitelySwapped) { + return new NoopQueryRunner<>(); + } + + // Prevent the underlying segment from swapping when its being iterated + final Pair segment = hydrant.getAndIncrementSegment(); + try { + QueryRunner baseRunner = QueryRunnerHelper.makeClosingQueryRunner( + factory.createRunner(segment.lhs), + segment.rhs + ); + + // 1) Only use caching if data is immutable + // 2) Hydrants are not the same between replicas, make sure cache is local + if (hydrantDefinitelySwapped && cache.isLocal()) { + return new CachingQueryRunner<>( + makeHydrantCacheIdentifier(hydrant), + descriptor, + objectMapper, + cache, + toolChest, + baseRunner, + MoreExecutors.sameThreadExecutor(), + cacheConfig + ); + } else { + return baseRunner; + } + } + catch (RuntimeException e) { + CloseQuietly.close(segment.rhs); + throw e; + } + } + } + ) + ) + ), + builderFn, + sinkSegmentIdentifier, + cpuTimeAccumulator + ), + new SpecificSegmentSpec(descriptor) + ); + } + } + ) + ) + ), + builderFn, + emitter, + cpuTimeAccumulator, + true + ); + } + + /** + * Decorates a Sink's query runner to emit query/segmentAndCache/time, query/segment/time, query/wait/time once + * each for the whole Sink. Also adds CPU time to cpuTimeAccumulator. + */ + private QueryRunner withPerSinkMetrics( + final QueryRunner sinkRunner, + final Function, ServiceMetricEvent.Builder> builderFn, + final String sinkSegmentIdentifier, + final AtomicLong cpuTimeAccumulator + ) + { + final ImmutableMap dims = ImmutableMap.of("segment", sinkSegmentIdentifier); + + // Note: query/segmentAndCache/time and query/segment/time are effectively the same here. They don't split apart + // cache vs. non-cache due to the fact that Sinks may be partially cached and partially uncached. Making this + // better would need to involve another accumulator like the cpuTimeAccumulator that we could share with the + // sinkRunner. + + return CPUTimeMetricQueryRunner.safeBuild( + new MetricsEmittingQueryRunner<>( + emitter, + builderFn, + new MetricsEmittingQueryRunner<>( + emitter, + builderFn, + sinkRunner, + "query/segment/time", + dims + ), + "query/segmentAndCache/time", + dims + ).withWaitMeasuredFromNow(), + builderFn, + emitter, + cpuTimeAccumulator, + false + ); + } + + public static String makeHydrantCacheIdentifier(FireHydrant input) + { + return input.getSegment().getIdentifier() + "_" + input.getCount(); + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 2ef8813ce34..81b13145b2d 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -31,17 +31,12 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.primitives.Ints; -import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.Granularity; import com.metamx.common.ISE; import com.metamx.common.Pair; import com.metamx.common.concurrent.ScheduledExecutors; -import com.metamx.common.guava.CloseQuietly; -import com.metamx.common.guava.FunctionalIterable; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; -import com.metamx.emitter.service.ServiceMetricEvent; -import io.druid.client.CachingQueryRunner; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; import io.druid.common.guava.ThreadRenamingCallable; @@ -51,17 +46,11 @@ import io.druid.concurrent.Execs; import io.druid.concurrent.TaskThreadPriority; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; -import io.druid.query.MetricsEmittingQueryRunner; -import io.druid.query.NoopQueryRunner; import io.druid.query.Query; import io.druid.query.QueryRunner; -import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerFactoryConglomerate; -import io.druid.query.QueryRunnerHelper; -import io.druid.query.QueryToolChest; +import io.druid.query.QuerySegmentWalker; import io.druid.query.SegmentDescriptor; -import io.druid.query.spec.SpecificSegmentQueryRunner; -import io.druid.query.spec.SpecificSegmentSpec; import io.druid.segment.IndexIO; import io.druid.segment.IndexMerger; import io.druid.segment.IndexSpec; @@ -76,9 +65,9 @@ import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.FireHydrant; import io.druid.segment.realtime.SegmentPublisher; +import io.druid.segment.realtime.appenderator.SinkQuerySegmentWalker; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; -import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.SingleElementPartitionChunk; import org.apache.commons.io.FileUtils; @@ -87,8 +76,6 @@ import org.joda.time.Duration; import org.joda.time.Interval; import org.joda.time.Period; -import javax.annotation.Nullable; -import java.io.Closeable; import java.io.File; import java.io.FilenameFilter; import java.io.IOException; @@ -111,10 +98,7 @@ public class RealtimePlumber implements Plumber private final RealtimeTuningConfig config; private final RejectionPolicy rejectionPolicy; private final FireDepartmentMetrics metrics; - private final ServiceEmitter emitter; - private final QueryRunnerFactoryConglomerate conglomerate; private final DataSegmentAnnouncer segmentAnnouncer; - private final ExecutorService queryExecutorService; private final DataSegmentPusher dataSegmentPusher; private final SegmentPublisher segmentPublisher; private final SegmentHandoffNotifier handoffNotifier; @@ -123,10 +107,9 @@ public class RealtimePlumber implements Plumber private final VersionedIntervalTimeline sinkTimeline = new VersionedIntervalTimeline( String.CASE_INSENSITIVE_ORDER ); + private final QuerySegmentWalker texasRanger; private final Cache cache; - private final CacheConfig cacheConfig; - private final ObjectMapper objectMapper; private volatile long nextFlush = 0; private volatile boolean shuttingDown = false; @@ -140,8 +123,6 @@ public class RealtimePlumber implements Plumber private static final String COMMIT_METADATA_KEY = "%commitMetadata%"; private static final String COMMIT_METADATA_TIMESTAMP_KEY = "%commitMetadataTimestamp%"; - private static final String SKIP_INCREMENTAL_SEGMENT = "skipIncrementalSegment"; - public RealtimePlumber( DataSchema schema, @@ -165,22 +146,23 @@ public class RealtimePlumber implements Plumber this.config = config; this.rejectionPolicy = config.getRejectionPolicyFactory().create(config.getWindowPeriod()); this.metrics = metrics; - this.emitter = emitter; - this.conglomerate = conglomerate; this.segmentAnnouncer = segmentAnnouncer; - this.queryExecutorService = queryExecutorService; this.dataSegmentPusher = dataSegmentPusher; this.segmentPublisher = segmentPublisher; this.handoffNotifier = handoffNotifier; this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger"); this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); this.cache = cache; - this.cacheConfig = cacheConfig; - this.objectMapper = objectMapper; - - if (!cache.isLocal()) { - log.error("Configured cache is not local, caching will not be enabled"); - } + this.texasRanger = new SinkQuerySegmentWalker( + schema.getDataSource(), + sinkTimeline, + objectMapper, + emitter, + conglomerate, + queryExecutorService, + cache, + cacheConfig + ); log.info("Creating plumber using rejectionPolicy[%s]", getRejectionPolicy()); } @@ -273,125 +255,8 @@ public class RealtimePlumber implements Plumber @Override public QueryRunner getQueryRunner(final Query query) { - final boolean skipIncrementalSegment = query.getContextBoolean(SKIP_INCREMENTAL_SEGMENT, false); - final QueryRunnerFactory> factory = conglomerate.findFactory(query); - final QueryToolChest> toolchest = factory.getToolchest(); - - final Function, ServiceMetricEvent.Builder> builderFn = - new Function, ServiceMetricEvent.Builder>() - { - - @Override - public ServiceMetricEvent.Builder apply(@Nullable Query input) - { - return toolchest.makeMetricBuilder(query); - } - }; - - List> querySinks = Lists.newArrayList(); - for (Interval interval : query.getIntervals()) { - querySinks.addAll(sinkTimeline.lookup(interval)); - } - - return toolchest.mergeResults( - factory.mergeRunners( - queryExecutorService, - FunctionalIterable - .create(querySinks) - .transform( - new Function, QueryRunner>() - { - @Override - public QueryRunner apply(TimelineObjectHolder holder) - { - if (holder == null) { - throw new ISE("No timeline entry at all!"); - } - - // The realtime plumber always uses SingleElementPartitionChunk - final Sink theSink = holder.getObject().getChunk(0).getObject(); - - if (theSink == null) { - throw new ISE("Missing sink for timeline entry[%s]!", holder); - } - - final SegmentDescriptor descriptor = new SegmentDescriptor( - holder.getInterval(), - theSink.getSegment().getVersion(), - theSink.getSegment().getShardSpec().getPartitionNum() - ); - - return new SpecificSegmentQueryRunner( - new MetricsEmittingQueryRunner( - emitter, - builderFn, - factory.mergeRunners( - MoreExecutors.sameThreadExecutor(), - Iterables.transform( - theSink, - new Function>() - { - @Override - public QueryRunner apply(FireHydrant input) - { - // Hydrant might swap at any point, but if it's swapped at the start - // then we know it's *definitely* swapped. - final boolean hydrantDefinitelySwapped = input.hasSwapped(); - - if (skipIncrementalSegment && !hydrantDefinitelySwapped) { - return new NoopQueryRunner(); - } - - // Prevent the underlying segment from swapping when its being iterated - final Pair segment = input.getAndIncrementSegment(); - try { - QueryRunner baseRunner = QueryRunnerHelper.makeClosingQueryRunner( - factory.createRunner(segment.lhs), - segment.rhs - ); - - if (hydrantDefinitelySwapped // only use caching if data is immutable - && cache.isLocal() // hydrants may not be in sync between replicas, make sure cache is local - ) { - return new CachingQueryRunner<>( - makeHydrantIdentifier(input, segment.lhs), - descriptor, - objectMapper, - cache, - toolchest, - baseRunner, - MoreExecutors.sameThreadExecutor(), - cacheConfig - ); - } else { - return baseRunner; - } - } - catch (RuntimeException e) { - CloseQuietly.close(segment.rhs); - throw e; - } - } - } - ) - ), - "query/segmentAndCache/time", - ImmutableMap.of("segment", theSink.getSegment().getIdentifier()) - ).withWaitMeasuredFromNow(), - new SpecificSegmentSpec( - descriptor - ) - ); - } - } - ) - ) - ); - } - - protected static String makeHydrantIdentifier(FireHydrant input, Segment segment) - { - return segment.getIdentifier() + "_" + input.getCount(); + // Calling getQueryRunnerForIntervals here works because there's only one segment per interval for RealtimePlumber. + return texasRanger.getQueryRunnerForIntervals(query, query.getIntervals()); } @Override @@ -990,7 +855,7 @@ public class RealtimePlumber implements Plumber new SingleElementPartitionChunk<>(sink) ); for (FireHydrant hydrant : sink) { - cache.close(makeHydrantIdentifier(hydrant, hydrant.getSegment())); + cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant)); } synchronized (handoffCondition) { handoffCondition.notifyAll();