fix race while index closing and querying

This commit is contained in:
nishantmonu51 2014-06-13 15:36:57 +05:30
parent 025814cfff
commit 0ddaf4c307
5 changed files with 47 additions and 26 deletions

View File

@ -223,7 +223,6 @@ public class YeOldePlumberSchool implements PlumberSchool
); );
indexToPersist.swapSegment(null); indexToPersist.swapSegment(null);
index.close();
metrics.incrementRowOutputCount(rowsToPersist); metrics.incrementRowOutputCount(rowsToPersist);
spilled.add(dirToPersist); spilled.add(dirToPersist);

View File

@ -63,7 +63,6 @@ public class GroupByParallelQueryRunner implements QueryRunner<Row>
private final QueryWatcher queryWatcher; private final QueryWatcher queryWatcher;
private final StupidPool<ByteBuffer> bufferPool; private final StupidPool<ByteBuffer> bufferPool;
public GroupByParallelQueryRunner( public GroupByParallelQueryRunner(
ExecutorService exec, ExecutorService exec,
Ordering<Row> ordering, Ordering<Row> ordering,
@ -155,17 +154,21 @@ public class GroupByParallelQueryRunner implements QueryRunner<Row>
catch (InterruptedException e) { catch (InterruptedException e) {
log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId()); log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId());
futures.cancel(true); futures.cancel(true);
indexAccumulatorPair.lhs.close();
throw new QueryInterruptedException("Query interrupted"); throw new QueryInterruptedException("Query interrupted");
} }
catch (CancellationException e) { catch (CancellationException e) {
indexAccumulatorPair.lhs.close();
throw new QueryInterruptedException("Query cancelled"); throw new QueryInterruptedException("Query cancelled");
} }
catch (TimeoutException e) { catch (TimeoutException e) {
indexAccumulatorPair.lhs.close();
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId()); log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
futures.cancel(true); futures.cancel(true);
throw new QueryInterruptedException("Query timeout"); throw new QueryInterruptedException("Query timeout");
} }
catch (ExecutionException e) { catch (ExecutionException e) {
indexAccumulatorPair.lhs.close();
throw Throwables.propagate(e.getCause()); throw Throwables.propagate(e.getCause());
} }
@ -173,7 +176,8 @@ public class GroupByParallelQueryRunner implements QueryRunner<Row>
Sequences.simple( Sequences.simple(
indexAccumulatorPair.lhs indexAccumulatorPair.lhs
.iterableWithPostAggregations(null) .iterableWithPostAggregations(null)
), indexAccumulatorPair.lhs ),
indexAccumulatorPair.lhs
); );
} }

View File

@ -68,6 +68,6 @@ public class IncrementalIndexSegment implements Segment
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
// do nothing index.close();
} }
} }

View File

@ -19,17 +19,21 @@
package io.druid.segment.realtime; package io.druid.segment.realtime;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import io.druid.segment.IncrementalIndexSegment; import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.ReferenceCountingSegment;
import io.druid.segment.Segment; import io.druid.segment.Segment;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
import java.io.IOException;
/** /**
*/ */
public class FireHydrant public class FireHydrant
{ {
private volatile IncrementalIndex index;
private volatile Segment adapter;
private final int count; private final int count;
private volatile IncrementalIndex index;
private volatile ReferenceCountingSegment adapter;
public FireHydrant( public FireHydrant(
IncrementalIndex index, IncrementalIndex index,
@ -38,7 +42,7 @@ public class FireHydrant
) )
{ {
this.index = index; this.index = index;
this.adapter = new IncrementalIndexSegment(index, segmentIdentifier); this.adapter = new ReferenceCountingSegment(new IncrementalIndexSegment(index, segmentIdentifier));
this.count = count; this.count = count;
} }
@ -48,7 +52,7 @@ public class FireHydrant
) )
{ {
this.index = null; this.index = null;
this.adapter = adapter; this.adapter = new ReferenceCountingSegment(adapter);
this.count = count; this.count = count;
} }
@ -57,7 +61,7 @@ public class FireHydrant
return index; return index;
} }
public Segment getSegment() public ReferenceCountingSegment getSegment()
{ {
return adapter; return adapter;
} }
@ -74,7 +78,15 @@ public class FireHydrant
public void swapSegment(Segment adapter) public void swapSegment(Segment adapter)
{ {
this.adapter = adapter; if (this.adapter != null) {
try {
this.adapter.close();
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
this.adapter = new ReferenceCountingSegment(adapter);
this.index = null; this.index = null;
} }

View File

@ -19,12 +19,10 @@ import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.FilteredServerView; import io.druid.client.FilteredServerView;
import io.druid.client.ServerView; import io.druid.client.ServerView;
import io.druid.collections.StupidPool;
import io.druid.common.guava.ThreadRenamingCallable; import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.common.guava.ThreadRenamingRunnable; import io.druid.common.guava.ThreadRenamingRunnable;
import io.druid.concurrent.Execs; import io.druid.concurrent.Execs;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.guice.annotations.Global;
import io.druid.query.MetricsEmittingQueryRunner; import io.druid.query.MetricsEmittingQueryRunner;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
@ -59,10 +57,10 @@ import org.joda.time.Interval;
import org.joda.time.Period; import org.joda.time.Period;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.File; import java.io.File;
import java.io.FilenameFilter; import java.io.FilenameFilter;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
@ -76,10 +74,8 @@ import java.util.concurrent.ScheduledExecutorService;
public class RealtimePlumber implements Plumber public class RealtimePlumber implements Plumber
{ {
private static final EmittingLogger log = new EmittingLogger(RealtimePlumber.class); private static final EmittingLogger log = new EmittingLogger(RealtimePlumber.class);
private final DataSchema schema; private final DataSchema schema;
private final RealtimeTuningConfig config; private final RealtimeTuningConfig config;
private final RejectionPolicy rejectionPolicy; private final RejectionPolicy rejectionPolicy;
private final FireDepartmentMetrics metrics; private final FireDepartmentMetrics metrics;
private final ServiceEmitter emitter; private final ServiceEmitter emitter;
@ -253,7 +249,19 @@ public class RealtimePlumber implements Plumber
@Override @Override
public QueryRunner<T> apply(FireHydrant input) public QueryRunner<T> apply(FireHydrant input)
{ {
return factory.createRunner(input.getSegment()); // Prevent the underlying segment from closing when its being iterated
final Closeable closeable = input.getSegment().increment();
try {
return factory.createRunner(input.getSegment());
}
finally {
try {
closeable.close();
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
} }
} }
) )
@ -717,8 +725,6 @@ public class RealtimePlumber implements Plumber
IndexIO.loadIndex(persistedFile) IndexIO.loadIndex(persistedFile)
) )
); );
//TODO: can there be some races here ?
index.close();
return numRows; return numRows;
} }
catch (IOException e) { catch (IOException e) {
@ -785,13 +791,13 @@ public class RealtimePlumber implements Plumber
&& config.getShardSpec().getPartitionNum() == segment.getShardSpec().getPartitionNum() && config.getShardSpec().getPartitionNum() == segment.getShardSpec().getPartitionNum()
&& Iterables.any( && Iterables.any(
sinks.keySet(), new Predicate<Long>() sinks.keySet(), new Predicate<Long>()
{ {
@Override @Override
public boolean apply(Long sinkKey) public boolean apply(Long sinkKey)
{ {
return segment.getInterval().contains(sinkKey); return segment.getInterval().contains(sinkKey);
} }
} }
); );
} }
} }