diff --git a/processing/src/main/java/io/druid/query/QueryRunnerHelper.java b/processing/src/main/java/io/druid/query/QueryRunnerHelper.java index ba773defd85..860a4e0b406 100644 --- a/processing/src/main/java/io/druid/query/QueryRunnerHelper.java +++ b/processing/src/main/java/io/druid/query/QueryRunnerHelper.java @@ -22,6 +22,7 @@ package io.druid.query; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Predicates; +import com.metamx.common.guava.ResourceClosingSequence; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.common.logger.Logger; @@ -33,7 +34,9 @@ import io.druid.segment.Cursor; import io.druid.segment.StorageAdapter; import org.joda.time.Interval; +import java.io.Closeable; import java.util.List; +import java.util.Map; /** */ @@ -81,4 +84,15 @@ public class QueryRunnerHelper Predicates.>notNull() ); } + + public static QueryRunner makeClosingQueryRunner(final QueryRunner runner, final Closeable closeable){ + return new QueryRunner() + { + @Override + public Sequence run(Query query, Map responseContext) + { + return new ResourceClosingSequence<>(runner.run(query, responseContext), closeable); + } + }; + } } diff --git a/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java b/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java index cc75843e8e0..e3545d6ca63 100644 --- a/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java @@ -28,33 +28,41 @@ import java.io.Closeable; import java.util.Map; /** -*/ + */ public class ReferenceCountingSegmentQueryRunner implements QueryRunner { private final QueryRunnerFactory> factory; private final ReferenceCountingSegment adapter; + private final SegmentDescriptor descriptor; public ReferenceCountingSegmentQueryRunner( QueryRunnerFactory> factory, - ReferenceCountingSegment adapter + ReferenceCountingSegment adapter, + SegmentDescriptor descriptor ) { this.factory = factory; this.adapter = adapter; + this.descriptor = descriptor; } @Override public Sequence run(final Query query, Map responseContext) { final Closeable closeable = adapter.increment(); - try { - final Sequence baseSequence = factory.createRunner(adapter).run(query, responseContext); + if (closeable != null) { + try { + final Sequence baseSequence = factory.createRunner(adapter).run(query, responseContext); - return new ResourceClosingSequence(baseSequence, closeable); - } - catch (RuntimeException e) { - CloseQuietly.close(closeable); - throw e; + return new ResourceClosingSequence(baseSequence, closeable); + } + catch (RuntimeException e) { + CloseQuietly.close(closeable); + throw e; + } + } else { + // Segment was closed before we had a chance to increment the reference count + return new ReportTimelineMissingSegmentQueryRunner(descriptor).run(query, responseContext); } } } diff --git a/server/src/main/java/io/druid/segment/realtime/FireHydrant.java b/server/src/main/java/io/druid/segment/realtime/FireHydrant.java index f69e1f158f1..4781520968c 100644 --- a/server/src/main/java/io/druid/segment/realtime/FireHydrant.java +++ b/server/src/main/java/io/druid/segment/realtime/FireHydrant.java @@ -20,11 +20,13 @@ package io.druid.segment.realtime; import com.google.common.base.Throwables; +import com.metamx.common.Pair; import io.druid.segment.IncrementalIndexSegment; import io.druid.segment.ReferenceCountingSegment; import io.druid.segment.Segment; import io.druid.segment.incremental.IncrementalIndex; +import java.io.Closeable; import java.io.IOException; /** @@ -34,6 +36,7 @@ public class FireHydrant private final int count; private volatile IncrementalIndex index; private volatile ReferenceCountingSegment adapter; + private Object swapLock = new Object(); public FireHydrant( IncrementalIndex index, @@ -61,7 +64,7 @@ public class FireHydrant return index; } - public ReferenceCountingSegment getSegment() + public Segment getSegment() { return adapter; } @@ -78,16 +81,27 @@ public class FireHydrant public void swapSegment(Segment adapter) { - if (this.adapter != null) { - try { - this.adapter.close(); - } - catch (IOException e) { - throw Throwables.propagate(e); + synchronized (swapLock) { + if (this.adapter != null) { + try { + this.adapter.close(); + } + catch (IOException e) { + throw Throwables.propagate(e); + } } + this.adapter = new ReferenceCountingSegment(adapter); + this.index = null; + } + } + + public Pair getAndIncrementSegment() + { + // Prevent swapping of index before increment is called + synchronized (swapLock) { + Closeable closeable = adapter.increment(); + return new Pair(adapter, closeable); } - this.adapter = new ReferenceCountingSegment(adapter); - this.index = null; } @Override diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 59cebb931de..f722549d6ff 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -36,6 +36,7 @@ import com.metamx.common.Granularity; import com.metamx.common.ISE; import com.metamx.common.Pair; import com.metamx.common.concurrent.ScheduledExecutors; +import com.metamx.common.guava.CloseQuietly; import com.metamx.common.guava.FunctionalIterable; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; @@ -55,6 +56,7 @@ import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerFactoryConglomerate; +import io.druid.query.QueryRunnerHelper; import io.druid.query.QueryToolChest; import io.druid.query.ReportTimelineMissingSegmentQueryRunner; import io.druid.query.SegmentDescriptor; @@ -327,47 +329,38 @@ public class RealtimePlumber implements Plumber @Override public QueryRunner apply(FireHydrant input) { - // It is possible that we got a query for a segment, and while that query - // is in the jetty queue, the segment is abandoned. Here, we need to retry - // the query for the segment. - if (input == null || input.getSegment() == null) { - return new ReportTimelineMissingSegmentQueryRunner(descriptor); - } - if (skipIncrementalSegment && !input.hasSwapped()) { return new NoopQueryRunner(); } - // Prevent the underlying segment from closing when its being iterated - final ReferenceCountingSegment segment = input.getSegment(); - final Closeable closeable = segment.increment(); + // Prevent the underlying segment from swapping when its being iterated + final Pair segment = input.getAndIncrementSegment(); try { + QueryRunner baseRunner = QueryRunnerHelper.makeClosingQueryRunner( + factory.createRunner(segment.lhs), + segment.rhs + ); + if (input.hasSwapped() // only use caching if data is immutable && cache.isLocal() // hydrants may not be in sync between replicas, make sure cache is local ) { return new CachingQueryRunner<>( - makeHydrantIdentifier(input, segment), + makeHydrantIdentifier(input, segment.lhs), descriptor, objectMapper, cache, toolchest, - factory.createRunner(segment), + baseRunner, MoreExecutors.sameThreadExecutor(), cacheConfig ); } else { - return factory.createRunner(input.getSegment()); + return baseRunner; } } - finally { - try { - if (closeable != null) { - closeable.close(); - } - } - catch (IOException e) { - throw Throwables.propagate(e); - } + catch (RuntimeException e) { + CloseQuietly.close(segment.rhs); + throw e; } } } @@ -385,7 +378,7 @@ public class RealtimePlumber implements Plumber ); } - protected static String makeHydrantIdentifier(FireHydrant input, ReferenceCountingSegment segment) + protected static String makeHydrantIdentifier(FireHydrant input, Segment segment) { return segment.getIdentifier() + "_" + input.getCount(); } @@ -406,12 +399,12 @@ public class RealtimePlumber implements Plumber final Stopwatch persistStopwatch = Stopwatch.createStarted(); final Map metadataElems = committer.getMetadata() == null ? null : - ImmutableMap.of( - COMMIT_METADATA_KEY, - committer.getMetadata(), - COMMIT_METADATA_TIMESTAMP_KEY, - System.currentTimeMillis() - ); + ImmutableMap.of( + COMMIT_METADATA_KEY, + committer.getMetadata(), + COMMIT_METADATA_TIMESTAMP_KEY, + System.currentTimeMillis() + ); persistExecutor.execute( new ThreadRenamingRunnable(String.format("%s-incremental-persist", schema.getDataSource())) diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index 94958848d1c..4b7c218aa16 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -442,7 +442,7 @@ public class ServerManager implements QuerySegmentWalker return toolChest.makeMetricBuilder(input); } }, - new ReferenceCountingSegmentQueryRunner(factory, adapter), + new ReferenceCountingSegmentQueryRunner(factory, adapter, segmentDescriptor), "query/segment/time", ImmutableMap.of("segment", adapter.getIdentifier()) ),