mirror of https://github.com/apache/druid.git
Merge pull request #2301 from metamx/fix-2299_2
fix reference counting for segments
This commit is contained in:
commit
0c5f4b947c
|
@ -22,6 +22,7 @@ package io.druid.query;
|
|||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Predicates;
|
||||
import com.metamx.common.guava.ResourceClosingSequence;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import com.metamx.common.logger.Logger;
|
||||
|
@ -33,7 +34,9 @@ import io.druid.segment.Cursor;
|
|||
import io.druid.segment.StorageAdapter;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -81,4 +84,15 @@ public class QueryRunnerHelper
|
|||
Predicates.<Result<T>>notNull()
|
||||
);
|
||||
}
|
||||
|
||||
public static <T> QueryRunner<T> makeClosingQueryRunner(final QueryRunner<T> runner, final Closeable closeable){
|
||||
return new QueryRunner<T>()
|
||||
{
|
||||
@Override
|
||||
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
|
||||
{
|
||||
return new ResourceClosingSequence<>(runner.run(query, responseContext), closeable);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,33 +28,41 @@ import java.io.Closeable;
|
|||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*/
|
||||
*/
|
||||
public class ReferenceCountingSegmentQueryRunner<T> implements QueryRunner<T>
|
||||
{
|
||||
private final QueryRunnerFactory<T, Query<T>> factory;
|
||||
private final ReferenceCountingSegment adapter;
|
||||
private final SegmentDescriptor descriptor;
|
||||
|
||||
public ReferenceCountingSegmentQueryRunner(
|
||||
QueryRunnerFactory<T, Query<T>> factory,
|
||||
ReferenceCountingSegment adapter
|
||||
ReferenceCountingSegment adapter,
|
||||
SegmentDescriptor descriptor
|
||||
)
|
||||
{
|
||||
this.factory = factory;
|
||||
this.adapter = adapter;
|
||||
this.descriptor = descriptor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Sequence<T> run(final Query<T> query, Map<String, Object> responseContext)
|
||||
{
|
||||
final Closeable closeable = adapter.increment();
|
||||
try {
|
||||
final Sequence<T> baseSequence = factory.createRunner(adapter).run(query, responseContext);
|
||||
if (closeable != null) {
|
||||
try {
|
||||
final Sequence<T> baseSequence = factory.createRunner(adapter).run(query, responseContext);
|
||||
|
||||
return new ResourceClosingSequence<T>(baseSequence, closeable);
|
||||
}
|
||||
catch (RuntimeException e) {
|
||||
CloseQuietly.close(closeable);
|
||||
throw e;
|
||||
return new ResourceClosingSequence<T>(baseSequence, closeable);
|
||||
}
|
||||
catch (RuntimeException e) {
|
||||
CloseQuietly.close(closeable);
|
||||
throw e;
|
||||
}
|
||||
} else {
|
||||
// Segment was closed before we had a chance to increment the reference count
|
||||
return new ReportTimelineMissingSegmentQueryRunner<T>(descriptor).run(query, responseContext);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,11 +20,13 @@
|
|||
package io.druid.segment.realtime;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
import com.metamx.common.Pair;
|
||||
import io.druid.segment.IncrementalIndexSegment;
|
||||
import io.druid.segment.ReferenceCountingSegment;
|
||||
import io.druid.segment.Segment;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
|
@ -34,6 +36,7 @@ public class FireHydrant
|
|||
private final int count;
|
||||
private volatile IncrementalIndex index;
|
||||
private volatile ReferenceCountingSegment adapter;
|
||||
private Object swapLock = new Object();
|
||||
|
||||
public FireHydrant(
|
||||
IncrementalIndex index,
|
||||
|
@ -61,7 +64,7 @@ public class FireHydrant
|
|||
return index;
|
||||
}
|
||||
|
||||
public ReferenceCountingSegment getSegment()
|
||||
public Segment getSegment()
|
||||
{
|
||||
return adapter;
|
||||
}
|
||||
|
@ -78,16 +81,27 @@ public class FireHydrant
|
|||
|
||||
public void swapSegment(Segment adapter)
|
||||
{
|
||||
if (this.adapter != null) {
|
||||
try {
|
||||
this.adapter.close();
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
synchronized (swapLock) {
|
||||
if (this.adapter != null) {
|
||||
try {
|
||||
this.adapter.close();
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
this.adapter = new ReferenceCountingSegment(adapter);
|
||||
this.index = null;
|
||||
}
|
||||
}
|
||||
|
||||
public Pair<Segment, Closeable> getAndIncrementSegment()
|
||||
{
|
||||
// Prevent swapping of index before increment is called
|
||||
synchronized (swapLock) {
|
||||
Closeable closeable = adapter.increment();
|
||||
return new Pair<Segment, Closeable>(adapter, closeable);
|
||||
}
|
||||
this.adapter = new ReferenceCountingSegment(adapter);
|
||||
this.index = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -36,6 +36,7 @@ import com.metamx.common.Granularity;
|
|||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.Pair;
|
||||
import com.metamx.common.concurrent.ScheduledExecutors;
|
||||
import com.metamx.common.guava.CloseQuietly;
|
||||
import com.metamx.common.guava.FunctionalIterable;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
|
@ -55,6 +56,7 @@ import io.druid.query.Query;
|
|||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryRunnerFactory;
|
||||
import io.druid.query.QueryRunnerFactoryConglomerate;
|
||||
import io.druid.query.QueryRunnerHelper;
|
||||
import io.druid.query.QueryToolChest;
|
||||
import io.druid.query.ReportTimelineMissingSegmentQueryRunner;
|
||||
import io.druid.query.SegmentDescriptor;
|
||||
|
@ -327,47 +329,38 @@ public class RealtimePlumber implements Plumber
|
|||
@Override
|
||||
public QueryRunner<T> apply(FireHydrant input)
|
||||
{
|
||||
// It is possible that we got a query for a segment, and while that query
|
||||
// is in the jetty queue, the segment is abandoned. Here, we need to retry
|
||||
// the query for the segment.
|
||||
if (input == null || input.getSegment() == null) {
|
||||
return new ReportTimelineMissingSegmentQueryRunner<T>(descriptor);
|
||||
}
|
||||
|
||||
if (skipIncrementalSegment && !input.hasSwapped()) {
|
||||
return new NoopQueryRunner<T>();
|
||||
}
|
||||
|
||||
// Prevent the underlying segment from closing when its being iterated
|
||||
final ReferenceCountingSegment segment = input.getSegment();
|
||||
final Closeable closeable = segment.increment();
|
||||
// Prevent the underlying segment from swapping when its being iterated
|
||||
final Pair<Segment, Closeable> segment = input.getAndIncrementSegment();
|
||||
try {
|
||||
QueryRunner<T> baseRunner = QueryRunnerHelper.makeClosingQueryRunner(
|
||||
factory.createRunner(segment.lhs),
|
||||
segment.rhs
|
||||
);
|
||||
|
||||
if (input.hasSwapped() // only use caching if data is immutable
|
||||
&& cache.isLocal() // hydrants may not be in sync between replicas, make sure cache is local
|
||||
) {
|
||||
return new CachingQueryRunner<>(
|
||||
makeHydrantIdentifier(input, segment),
|
||||
makeHydrantIdentifier(input, segment.lhs),
|
||||
descriptor,
|
||||
objectMapper,
|
||||
cache,
|
||||
toolchest,
|
||||
factory.createRunner(segment),
|
||||
baseRunner,
|
||||
MoreExecutors.sameThreadExecutor(),
|
||||
cacheConfig
|
||||
);
|
||||
} else {
|
||||
return factory.createRunner(input.getSegment());
|
||||
return baseRunner;
|
||||
}
|
||||
}
|
||||
finally {
|
||||
try {
|
||||
if (closeable != null) {
|
||||
closeable.close();
|
||||
}
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
catch (RuntimeException e) {
|
||||
CloseQuietly.close(segment.rhs);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -385,7 +378,7 @@ public class RealtimePlumber implements Plumber
|
|||
);
|
||||
}
|
||||
|
||||
protected static String makeHydrantIdentifier(FireHydrant input, ReferenceCountingSegment segment)
|
||||
protected static String makeHydrantIdentifier(FireHydrant input, Segment segment)
|
||||
{
|
||||
return segment.getIdentifier() + "_" + input.getCount();
|
||||
}
|
||||
|
@ -406,12 +399,12 @@ public class RealtimePlumber implements Plumber
|
|||
final Stopwatch persistStopwatch = Stopwatch.createStarted();
|
||||
|
||||
final Map<String, Object> metadataElems = committer.getMetadata() == null ? null :
|
||||
ImmutableMap.of(
|
||||
COMMIT_METADATA_KEY,
|
||||
committer.getMetadata(),
|
||||
COMMIT_METADATA_TIMESTAMP_KEY,
|
||||
System.currentTimeMillis()
|
||||
);
|
||||
ImmutableMap.of(
|
||||
COMMIT_METADATA_KEY,
|
||||
committer.getMetadata(),
|
||||
COMMIT_METADATA_TIMESTAMP_KEY,
|
||||
System.currentTimeMillis()
|
||||
);
|
||||
|
||||
persistExecutor.execute(
|
||||
new ThreadRenamingRunnable(String.format("%s-incremental-persist", schema.getDataSource()))
|
||||
|
|
|
@ -442,7 +442,7 @@ public class ServerManager implements QuerySegmentWalker
|
|||
return toolChest.makeMetricBuilder(input);
|
||||
}
|
||||
},
|
||||
new ReferenceCountingSegmentQueryRunner<T>(factory, adapter),
|
||||
new ReferenceCountingSegmentQueryRunner<T>(factory, adapter, segmentDescriptor),
|
||||
"query/segment/time",
|
||||
ImmutableMap.of("segment", adapter.getIdentifier())
|
||||
),
|
||||
|
|
Loading…
Reference in New Issue