fix reference counting for segments

This commit is contained in:
Nishant 2016-01-20 17:24:21 +05:30
parent fc09929503
commit 59ea186af7
5 changed files with 77 additions and 41 deletions

View File

@ -22,6 +22,7 @@ package io.druid.query;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.metamx.common.guava.ResourceClosingSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
@ -33,7 +34,9 @@ import io.druid.segment.Cursor;
import io.druid.segment.StorageAdapter;
import org.joda.time.Interval;
import java.io.Closeable;
import java.util.List;
import java.util.Map;
/**
*/
@ -81,4 +84,15 @@ public class QueryRunnerHelper
Predicates.<Result<T>>notNull()
);
}
public static <T> QueryRunner<T> makeClosingQueryRunner(final QueryRunner<T> runner, final Closeable closeable){
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
{
return new ResourceClosingSequence<>(runner.run(query, responseContext), closeable);
}
};
}
}

View File

@ -28,33 +28,41 @@ import java.io.Closeable;
import java.util.Map;
/**
*/
*/
public class ReferenceCountingSegmentQueryRunner<T> implements QueryRunner<T>
{
private final QueryRunnerFactory<T, Query<T>> factory;
private final ReferenceCountingSegment adapter;
private final SegmentDescriptor descriptor;
public ReferenceCountingSegmentQueryRunner(
QueryRunnerFactory<T, Query<T>> factory,
ReferenceCountingSegment adapter
ReferenceCountingSegment adapter,
SegmentDescriptor descriptor
)
{
this.factory = factory;
this.adapter = adapter;
this.descriptor = descriptor;
}
@Override
public Sequence<T> run(final Query<T> query, Map<String, Object> responseContext)
{
final Closeable closeable = adapter.increment();
try {
final Sequence<T> baseSequence = factory.createRunner(adapter).run(query, responseContext);
if (closeable != null) {
try {
final Sequence<T> baseSequence = factory.createRunner(adapter).run(query, responseContext);
return new ResourceClosingSequence<T>(baseSequence, closeable);
}
catch (RuntimeException e) {
CloseQuietly.close(closeable);
throw e;
return new ResourceClosingSequence<T>(baseSequence, closeable);
}
catch (RuntimeException e) {
CloseQuietly.close(closeable);
throw e;
}
} else {
// Segment was closed before we had a chance to increment the reference count
return new ReportTimelineMissingSegmentQueryRunner<T>(descriptor).run(query, responseContext);
}
}
}

View File

@ -20,11 +20,13 @@
package io.druid.segment.realtime;
import com.google.common.base.Throwables;
import com.metamx.common.Pair;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.ReferenceCountingSegment;
import io.druid.segment.Segment;
import io.druid.segment.incremental.IncrementalIndex;
import java.io.Closeable;
import java.io.IOException;
/**
@ -34,6 +36,7 @@ public class FireHydrant
private final int count;
private volatile IncrementalIndex index;
private volatile ReferenceCountingSegment adapter;
private Object swapLock = new Object();
public FireHydrant(
IncrementalIndex index,
@ -61,7 +64,7 @@ public class FireHydrant
return index;
}
public ReferenceCountingSegment getSegment()
public Segment getSegment()
{
return adapter;
}
@ -78,16 +81,27 @@ public class FireHydrant
public void swapSegment(Segment adapter)
{
if (this.adapter != null) {
try {
this.adapter.close();
}
catch (IOException e) {
throw Throwables.propagate(e);
synchronized (swapLock) {
if (this.adapter != null) {
try {
this.adapter.close();
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
this.adapter = new ReferenceCountingSegment(adapter);
this.index = null;
}
}
public Pair<Segment, Closeable> getAndIncrementSegment()
{
// Prevent swapping of index before increment is called
synchronized (swapLock) {
Closeable closeable = adapter.increment();
return new Pair<Segment, Closeable>(adapter, closeable);
}
this.adapter = new ReferenceCountingSegment(adapter);
this.index = null;
}
@Override

View File

@ -36,6 +36,7 @@ import com.metamx.common.Granularity;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
@ -55,6 +56,7 @@ import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QueryRunnerHelper;
import io.druid.query.QueryToolChest;
import io.druid.query.ReportTimelineMissingSegmentQueryRunner;
import io.druid.query.SegmentDescriptor;
@ -338,36 +340,34 @@ public class RealtimePlumber implements Plumber
return new NoopQueryRunner<T>();
}
// Prevent the underlying segment from closing when its being iterated
final ReferenceCountingSegment segment = input.getSegment();
final Closeable closeable = segment.increment();
// Prevent the underlying segment from swapping when its being iterated
final Pair<Segment, Closeable> segment = input.getAndIncrementSegment();
try {
QueryRunner<T> baseRunner = QueryRunnerHelper.makeClosingQueryRunner(
factory.createRunner(segment.lhs),
segment.rhs
);
if (input.hasSwapped() // only use caching if data is immutable
&& cache.isLocal() // hydrants may not be in sync between replicas, make sure cache is local
) {
return new CachingQueryRunner<>(
makeHydrantIdentifier(input, segment),
makeHydrantIdentifier(input, segment.lhs),
descriptor,
objectMapper,
cache,
toolchest,
factory.createRunner(segment),
baseRunner,
MoreExecutors.sameThreadExecutor(),
cacheConfig
);
} else {
return factory.createRunner(input.getSegment());
return baseRunner;
}
}
finally {
try {
if (closeable != null) {
closeable.close();
}
}
catch (IOException e) {
throw Throwables.propagate(e);
}
catch (RuntimeException e) {
CloseQuietly.close(segment.rhs);
throw e;
}
}
}
@ -385,7 +385,7 @@ public class RealtimePlumber implements Plumber
);
}
protected static String makeHydrantIdentifier(FireHydrant input, ReferenceCountingSegment segment)
protected static String makeHydrantIdentifier(FireHydrant input, Segment segment)
{
return segment.getIdentifier() + "_" + input.getCount();
}
@ -406,12 +406,12 @@ public class RealtimePlumber implements Plumber
final Stopwatch persistStopwatch = Stopwatch.createStarted();
final Map<String, Object> metadataElems = committer.getMetadata() == null ? null :
ImmutableMap.of(
COMMIT_METADATA_KEY,
committer.getMetadata(),
COMMIT_METADATA_TIMESTAMP_KEY,
System.currentTimeMillis()
);
ImmutableMap.of(
COMMIT_METADATA_KEY,
committer.getMetadata(),
COMMIT_METADATA_TIMESTAMP_KEY,
System.currentTimeMillis()
);
persistExecutor.execute(
new ThreadRenamingRunnable(String.format("%s-incremental-persist", schema.getDataSource()))

View File

@ -442,7 +442,7 @@ public class ServerManager implements QuerySegmentWalker
return toolChest.makeMetricBuilder(input);
}
},
new ReferenceCountingSegmentQueryRunner<T>(factory, adapter),
new ReferenceCountingSegmentQueryRunner<T>(factory, adapter, segmentDescriptor),
"query/segment/time",
ImmutableMap.of("segment", adapter.getIdentifier())
),