diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java index 9ed0fc1e062..a34a24b4550 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java @@ -223,7 +223,6 @@ public class YeOldePlumberSchool implements PlumberSchool ); indexToPersist.swapSegment(null); - index.close(); metrics.incrementRowOutputCount(rowsToPersist); spilled.add(dirToPersist); diff --git a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java index 070f54dace5..705e8eabc4f 100644 --- a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java +++ b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java @@ -63,7 +63,6 @@ public class GroupByParallelQueryRunner implements QueryRunner private final QueryWatcher queryWatcher; private final StupidPool bufferPool; - public GroupByParallelQueryRunner( ExecutorService exec, Ordering ordering, @@ -155,17 +154,21 @@ public class GroupByParallelQueryRunner implements QueryRunner catch (InterruptedException e) { log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId()); futures.cancel(true); + indexAccumulatorPair.lhs.close(); throw new QueryInterruptedException("Query interrupted"); } catch (CancellationException e) { + indexAccumulatorPair.lhs.close(); throw new QueryInterruptedException("Query cancelled"); } catch (TimeoutException e) { + indexAccumulatorPair.lhs.close(); log.info("Query timeout, cancelling pending results for query id [%s]", query.getId()); futures.cancel(true); throw new QueryInterruptedException("Query timeout"); } catch (ExecutionException e) { + indexAccumulatorPair.lhs.close(); throw Throwables.propagate(e.getCause()); } @@ -173,7 +176,8 @@ public class GroupByParallelQueryRunner implements QueryRunner Sequences.simple( indexAccumulatorPair.lhs .iterableWithPostAggregations(null) - ), indexAccumulatorPair.lhs + ), + indexAccumulatorPair.lhs ); } diff --git a/processing/src/main/java/io/druid/segment/IncrementalIndexSegment.java b/processing/src/main/java/io/druid/segment/IncrementalIndexSegment.java index f21f7f1fa09..641ad41ba61 100644 --- a/processing/src/main/java/io/druid/segment/IncrementalIndexSegment.java +++ b/processing/src/main/java/io/druid/segment/IncrementalIndexSegment.java @@ -68,6 +68,6 @@ public class IncrementalIndexSegment implements Segment @Override public void close() throws IOException { - // do nothing + index.close(); } } diff --git a/server/src/main/java/io/druid/segment/realtime/FireHydrant.java b/server/src/main/java/io/druid/segment/realtime/FireHydrant.java index db99ff45719..282ef63b624 100644 --- a/server/src/main/java/io/druid/segment/realtime/FireHydrant.java +++ b/server/src/main/java/io/druid/segment/realtime/FireHydrant.java @@ -19,17 +19,21 @@ package io.druid.segment.realtime; +import com.google.api.client.repackaged.com.google.common.base.Throwables; import io.druid.segment.IncrementalIndexSegment; +import io.druid.segment.ReferenceCountingSegment; import io.druid.segment.Segment; import io.druid.segment.incremental.IncrementalIndex; +import java.io.IOException; + /** -*/ + */ public class FireHydrant { - private volatile IncrementalIndex index; - private volatile Segment adapter; private final int count; + private volatile IncrementalIndex index; + private volatile ReferenceCountingSegment adapter; public FireHydrant( IncrementalIndex index, @@ -38,7 +42,7 @@ public class FireHydrant ) { this.index = index; - this.adapter = new IncrementalIndexSegment(index, segmentIdentifier); + this.adapter = new ReferenceCountingSegment(new IncrementalIndexSegment(index, segmentIdentifier)); this.count = count; } @@ -48,7 +52,7 @@ public class FireHydrant ) { this.index = null; - this.adapter = adapter; + this.adapter = new ReferenceCountingSegment(adapter); this.count = count; } @@ -57,7 +61,7 @@ public class FireHydrant return index; } - public Segment getSegment() + public ReferenceCountingSegment getSegment() { return adapter; } @@ -74,7 +78,15 @@ public class FireHydrant public void swapSegment(Segment adapter) { - this.adapter = adapter; + if (this.adapter != null) { + try { + this.adapter.close(); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + this.adapter = new ReferenceCountingSegment(adapter); this.index = null; } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 337149aadac..0b46de25de2 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -19,12 +19,10 @@ import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.client.FilteredServerView; import io.druid.client.ServerView; -import io.druid.collections.StupidPool; import io.druid.common.guava.ThreadRenamingCallable; import io.druid.common.guava.ThreadRenamingRunnable; import io.druid.concurrent.Execs; import io.druid.data.input.InputRow; -import io.druid.guice.annotations.Global; import io.druid.query.MetricsEmittingQueryRunner; import io.druid.query.Query; import io.druid.query.QueryRunner; @@ -59,10 +57,10 @@ import org.joda.time.Interval; import org.joda.time.Period; import javax.annotation.Nullable; +import java.io.Closeable; import java.io.File; import java.io.FilenameFilter; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Comparator; import java.util.List; @@ -76,10 +74,8 @@ import java.util.concurrent.ScheduledExecutorService; public class RealtimePlumber implements Plumber { private static final EmittingLogger log = new EmittingLogger(RealtimePlumber.class); - private final DataSchema schema; private final RealtimeTuningConfig config; - private final RejectionPolicy rejectionPolicy; private final FireDepartmentMetrics metrics; private final ServiceEmitter emitter; @@ -253,7 +249,19 @@ public class RealtimePlumber implements Plumber @Override public QueryRunner apply(FireHydrant input) { - return factory.createRunner(input.getSegment()); + // Prevent the underlying segment from closing when its being iterated + final Closeable closeable = input.getSegment().increment(); + try { + return factory.createRunner(input.getSegment()); + } + finally { + try { + closeable.close(); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } } } ) @@ -717,8 +725,6 @@ public class RealtimePlumber implements Plumber IndexIO.loadIndex(persistedFile) ) ); - //TODO: can there be some races here ? - index.close(); return numRows; } catch (IOException e) { @@ -785,13 +791,13 @@ public class RealtimePlumber implements Plumber && config.getShardSpec().getPartitionNum() == segment.getShardSpec().getPartitionNum() && Iterables.any( sinks.keySet(), new Predicate() - { - @Override - public boolean apply(Long sinkKey) - { - return segment.getInterval().contains(sinkKey); - } - } + { + @Override + public boolean apply(Long sinkKey) + { + return segment.getInterval().contains(sinkKey); + } + } ); } }