From 832cc293efe50ca11897c97c9ef84f7d263df3c9 Mon Sep 17 00:00:00 2001 From: Roman Leventov Date: Tue, 12 Sep 2017 14:28:35 -0500 Subject: [PATCH] Refactoring of ReferenceCountingSegment and FireHydrant (#4154) * Refactoring of ReferenceCountingSegment and FireHydrant * Address comment * Fix FireHydrant.closeSegment() * Address comment * Added comments to ReferenceCountingSegment --- .../ReferenceCountingSegmentQueryRunner.java | 18 +- .../segment/ReferenceCountingSegment.java | 164 +++++++----------- .../segment/ReferenceCountingSegmentTest.java | 51 +++--- .../druid/segment/realtime/FireHydrant.java | 101 ++++++----- .../appenderator/AppenderatorImpl.java | 53 +++--- .../appenderator/SinkQuerySegmentWalker.java | 2 +- .../realtime/plumber/RealtimePlumber.java | 44 +++-- .../druid/segment/realtime/plumber/Sink.java | 23 ++- .../java/io/druid/server/SegmentManager.java | 13 +- .../plumber/RealtimePlumberSchoolTest.java | 19 +- 10 files changed, 248 insertions(+), 240 deletions(-) diff --git a/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java b/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java index d513e079409..855c6422970 100644 --- a/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java @@ -19,12 +19,10 @@ package io.druid.query; -import io.druid.java.util.common.guava.CloseQuietly; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.segment.ReferenceCountingSegment; -import java.io.Closeable; import java.util.Map; /** @@ -49,16 +47,20 @@ public class ReferenceCountingSegmentQueryRunner implements QueryRunner @Override public Sequence run(final QueryPlus queryPlus, Map responseContext) { - final Closeable closeable = adapter.increment(); - if (closeable != null) { + if (adapter.increment()) { try { final Sequence baseSequence = factory.createRunner(adapter).run(queryPlus, responseContext); - return Sequences.withBaggage(baseSequence, closeable); + return Sequences.withBaggage(baseSequence, adapter.decrementOnceCloseable()); } - catch (RuntimeException e) { - CloseQuietly.close(closeable); - throw e; + catch (Throwable t) { + try { + adapter.decrement(); + } + catch (Exception e) { + t.addSuppressed(e); + } + throw t; } } else { // Segment was closed before we had a chance to increment the reference count diff --git a/processing/src/main/java/io/druid/segment/ReferenceCountingSegment.java b/processing/src/main/java/io/druid/segment/ReferenceCountingSegment.java index 64c133370b5..f8ec72b2be8 100644 --- a/processing/src/main/java/io/druid/segment/ReferenceCountingSegment.java +++ b/processing/src/main/java/io/druid/segment/ReferenceCountingSegment.java @@ -19,23 +19,48 @@ package io.druid.segment; +import com.google.common.base.Preconditions; import com.metamx.emitter.EmittingLogger; import org.joda.time.Interval; import java.io.Closeable; -import java.io.IOException; +import java.util.concurrent.Phaser; import java.util.concurrent.atomic.AtomicBoolean; +/** + * ReferenceCountingSegment allows to call {@link #close()} before some other "users", which called {@link + * #increment()}, has not called {@link #decrement()} yet, and the wrapped {@link Segment} won't be actually closed + * until that. So ReferenceCountingSegment implements something like automatic reference count-based resource + * management. + */ public class ReferenceCountingSegment extends AbstractSegment { private static final EmittingLogger log = new EmittingLogger(ReferenceCountingSegment.class); private final Segment baseSegment; - - private final Object lock = new Object(); - - private volatile int numReferences = 0; - private volatile boolean isClosed = false; + private final AtomicBoolean closed = new AtomicBoolean(false); + private final Phaser referents = new Phaser(1) + { + @Override + protected boolean onAdvance(int phase, int registeredParties) + { + Preconditions.checkState(registeredParties == 0); + // Ensure that onAdvance() doesn't throw exception, otherwise termination won't happen + try { + baseSegment.close(); + } + catch (Exception e) { + try { + log.error(e, "Exception while closing segment[%s]", baseSegment.getIdentifier()); + } + catch (Exception e2) { + // ignore + } + } + // Always terminate. + return true; + } + }; public ReferenceCountingSegment(Segment baseSegment) { @@ -44,141 +69,78 @@ public class ReferenceCountingSegment extends AbstractSegment public Segment getBaseSegment() { - synchronized (lock) { - if (isClosed) { - return null; - } - - return baseSegment; - } + return !isClosed() ? baseSegment : null; } public int getNumReferences() { - return numReferences; + return Math.max(referents.getRegisteredParties() - 1, 0); } public boolean isClosed() { - return isClosed; + return referents.isTerminated(); } @Override public String getIdentifier() { - synchronized (lock) { - if (isClosed) { - return null; - } - - return baseSegment.getIdentifier(); - } + return !isClosed() ? baseSegment.getIdentifier() : null; } @Override public Interval getDataInterval() { - synchronized (lock) { - if (isClosed) { - return null; - } - - return baseSegment.getDataInterval(); - } + return !isClosed() ? baseSegment.getDataInterval() : null; } @Override public QueryableIndex asQueryableIndex() { - synchronized (lock) { - if (isClosed) { - return null; - } - - return baseSegment.asQueryableIndex(); - } + return !isClosed() ? baseSegment.asQueryableIndex() : null; } @Override public StorageAdapter asStorageAdapter() { - synchronized (lock) { - if (isClosed) { - return null; - } - - return baseSegment.asStorageAdapter(); - } + return !isClosed() ? baseSegment.asStorageAdapter() : null; } @Override - public void close() throws IOException + public void close() { - synchronized (lock) { - if (isClosed) { - log.info("Failed to close, %s is closed already", baseSegment.getIdentifier()); - return; - } + if (closed.compareAndSet(false, true)) { + referents.arriveAndDeregister(); + } else { + log.warn("close() is called more than once on ReferenceCountingSegment"); + } + } - if (numReferences > 0) { - log.info("%d references to %s still exist. Decrementing.", numReferences, baseSegment.getIdentifier()); + public boolean increment() + { + // Negative return from referents.register() means the Phaser is terminated. + return referents.register() >= 0; + } + /** + * Returns a {@link Closeable} which action is to call {@link #decrement()} only once. If close() is called on the + * returned Closeable object for the second time, it won't call {@link #decrement()} again. + */ + public Closeable decrementOnceCloseable() + { + AtomicBoolean decremented = new AtomicBoolean(false); + return () -> { + if (decremented.compareAndSet(false, true)) { decrement(); } else { - log.info("Closing %s", baseSegment.getIdentifier()); - innerClose(); + log.warn("close() is called more than once on ReferenceCountingSegment.decrementOnceCloseable()"); } - } + }; } - public Closeable increment() + public void decrement() { - synchronized (lock) { - if (isClosed) { - return null; - } - - numReferences++; - final AtomicBoolean decrementOnce = new AtomicBoolean(false); - return new Closeable() - { - @Override - public void close() throws IOException - { - if (decrementOnce.compareAndSet(false, true)) { - decrement(); - } - } - }; - } - } - - private void decrement() - { - synchronized (lock) { - if (isClosed) { - return; - } - - if (--numReferences < 0) { - try { - innerClose(); - } - catch (Exception e) { - log.error("Unable to close queryable index %s", getIdentifier()); - } - } - } - } - - private void innerClose() throws IOException - { - synchronized (lock) { - log.info("Closing %s, numReferences: %d", baseSegment.getIdentifier(), numReferences); - - isClosed = true; - baseSegment.close(); - } + referents.arriveAndDeregister(); } @Override diff --git a/processing/src/test/java/io/druid/segment/ReferenceCountingSegmentTest.java b/processing/src/test/java/io/druid/segment/ReferenceCountingSegmentTest.java index 724bcb41686..ac2102a0814 100644 --- a/processing/src/test/java/io/druid/segment/ReferenceCountingSegmentTest.java +++ b/processing/src/test/java/io/druid/segment/ReferenceCountingSegmentTest.java @@ -83,56 +83,47 @@ public class ReferenceCountingSegmentTest public void testMultipleClose() throws Exception { Assert.assertFalse(segment.isClosed()); - final Closeable closeable = segment.increment(); - Assert.assertTrue(segment.getNumReferences() == 1); + Assert.assertTrue(segment.increment()); + Assert.assertEquals(1, segment.getNumReferences()); + Closeable closeable = segment.decrementOnceCloseable(); closeable.close(); closeable.close(); exec.submit( - new Runnable() - { - @Override - public void run() - { - try { - closeable.close(); - } - catch (Exception e) { - throw Throwables.propagate(e); - } + () -> { + try { + closeable.close(); + } + catch (Exception e) { + throw Throwables.propagate(e); } } - ); - Assert.assertTrue(segment.getNumReferences() == 0); + ).get(); + Assert.assertEquals(0, segment.getNumReferences()); Assert.assertFalse(segment.isClosed()); segment.close(); segment.close(); exec.submit( - new Runnable() - { - @Override - public void run() - { - try { - segment.close(); - } - catch (Exception e) { - throw Throwables.propagate(e); - } + () -> { + try { + segment.close(); + } + catch (Exception e) { + throw Throwables.propagate(e); } } - ); + ).get(); - Assert.assertTrue(segment.getNumReferences() == 0); + Assert.assertEquals(0, segment.getNumReferences()); Assert.assertTrue(segment.isClosed()); segment.increment(); segment.increment(); segment.increment(); - Assert.assertTrue(segment.getNumReferences() == 0); + Assert.assertEquals(0, segment.getNumReferences()); segment.close(); - Assert.assertTrue(segment.getNumReferences() == 0); + Assert.assertEquals(0, segment.getNumReferences()); } } diff --git a/server/src/main/java/io/druid/segment/realtime/FireHydrant.java b/server/src/main/java/io/druid/segment/realtime/FireHydrant.java index afcd10d1842..260742a1229 100644 --- a/server/src/main/java/io/druid/segment/realtime/FireHydrant.java +++ b/server/src/main/java/io/druid/segment/realtime/FireHydrant.java @@ -19,46 +19,39 @@ package io.druid.segment.realtime; -import com.google.common.base.Throwables; - import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; import io.druid.segment.IncrementalIndexSegment; import io.druid.segment.ReferenceCountingSegment; import io.druid.segment.Segment; import io.druid.segment.incremental.IncrementalIndex; +import org.joda.time.Interval; +import javax.annotation.Nullable; import java.io.Closeable; -import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; /** */ public class FireHydrant { private final int count; - private final Object swapLock = new Object(); - + private final AtomicReference adapter; private volatile IncrementalIndex index; - private volatile ReferenceCountingSegment adapter; - public FireHydrant( - IncrementalIndex index, - int count, - String segmentIdentifier - ) + public FireHydrant(IncrementalIndex index, int count, String segmentIdentifier) { this.index = index; - this.adapter = new ReferenceCountingSegment(new IncrementalIndexSegment(index, segmentIdentifier)); + this.adapter = new AtomicReference<>( + new ReferenceCountingSegment(new IncrementalIndexSegment(index, segmentIdentifier)) + ); this.count = count; } - public FireHydrant( - Segment adapter, - int count - ) + public FireHydrant(Segment adapter, int count) { this.index = null; - this.adapter = new ReferenceCountingSegment(adapter); + this.adapter = new AtomicReference<>(new ReferenceCountingSegment(adapter)); this.count = count; } @@ -67,9 +60,35 @@ public class FireHydrant return index; } - public Segment getSegment() + public String getSegmentIdentifier() { - return adapter; + return adapter.get().getIdentifier(); + } + + public Interval getSegmentDataInterval() + { + return adapter.get().getDataInterval(); + } + + public ReferenceCountingSegment getIncrementedSegment() + { + ReferenceCountingSegment segment = adapter.get(); + while (true) { + if (segment.increment()) { + return segment; + } + // segment.increment() returned false, means it is closed. Since close() in swapSegment() happens after segment + // swap, the new segment should already be visible. + ReferenceCountingSegment newSegment = adapter.get(); + if (segment == newSegment) { + throw new ISE("segment.close() is called somewhere outside FireHydrant.swapSegment()"); + } + if (newSegment == null) { + throw new ISE("FireHydrant was 'closed' by swapping segment to null while acquiring a segment"); + } + segment = newSegment; + // Spin loop. + } } public int getCount() @@ -82,37 +101,41 @@ public class FireHydrant return index == null; } - public void swapSegment(Segment newAdapter) + public void swapSegment(@Nullable Segment newSegment) { - synchronized (swapLock) { - if (adapter != null && newAdapter != null && !newAdapter.getIdentifier().equals(adapter.getIdentifier())) { + while (true) { + ReferenceCountingSegment currentSegment = adapter.get(); + if (currentSegment == null && newSegment == null) { + return; + } + if (currentSegment != null && newSegment != null && + !newSegment.getIdentifier().equals(currentSegment.getIdentifier())) { // Sanity check: identifier should not change throw new ISE( "WTF?! Cannot swap identifier[%s] -> [%s]!", - adapter.getIdentifier(), - newAdapter.getIdentifier() + currentSegment.getIdentifier(), + newSegment.getIdentifier() ); } - if (this.adapter != null) { - try { - this.adapter.close(); - } - catch (IOException e) { - throw Throwables.propagate(e); - } + if (currentSegment == newSegment) { + throw new ISE("Cannot swap to the same segment"); + } + ReferenceCountingSegment newReferenceCountingSegment = + newSegment != null ? new ReferenceCountingSegment(newSegment) : null; + if (adapter.compareAndSet(currentSegment, newReferenceCountingSegment)) { + if (currentSegment != null) { + currentSegment.close(); + } + index = null; + return; } - this.adapter = new ReferenceCountingSegment(newAdapter); - this.index = null; } } public Pair getAndIncrementSegment() { - // Prevent swapping of index before increment is called - synchronized (swapLock) { - Closeable closeable = adapter.increment(); - return new Pair(adapter, closeable); - } + ReferenceCountingSegment segment = getIncrementedSegment(); + return new Pair<>(segment, segment.decrementOnceCloseable()); } @Override @@ -120,7 +143,7 @@ public class FireHydrant { return "FireHydrant{" + "index=" + index + - ", queryable=" + adapter.getIdentifier() + + ", queryable=" + adapter.get().getIdentifier() + ", count=" + count + '}'; } diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index fe3c4886f5f..a6d9be50121 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -51,6 +51,7 @@ import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; import io.druid.java.util.common.RetryUtils; import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.io.Closer; import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactoryConglomerate; @@ -75,6 +76,7 @@ import org.apache.commons.io.FileUtils; import org.joda.time.Interval; import javax.annotation.Nullable; +import java.io.Closeable; import java.io.File; import java.io.FilenameFilter; import java.io.IOException; @@ -576,22 +578,32 @@ public class AppenderatorImpl implements Appenderator throw new ISE("Merged target[%s] exists after removing?!", mergedTarget); } - List indexes = Lists.newArrayList(); - for (FireHydrant fireHydrant : sink) { - Segment segment = fireHydrant.getSegment(); - final QueryableIndex queryableIndex = segment.asQueryableIndex(); - log.info("Adding hydrant[%s]", fireHydrant); - indexes.add(queryableIndex); - } - final File mergedFile; - mergedFile = indexMerger.mergeQueryableIndex( - indexes, - schema.getGranularitySpec().isRollup(), - schema.getAggregators(), - mergedTarget, - tuningConfig.getIndexSpec() - ); + List indexes = Lists.newArrayList(); + Closer closer = Closer.create(); + try { + for (FireHydrant fireHydrant : sink) { + Pair segmentAndCloseable = fireHydrant.getAndIncrementSegment(); + final QueryableIndex queryableIndex = segmentAndCloseable.lhs.asQueryableIndex(); + log.info("Adding hydrant[%s]", fireHydrant); + indexes.add(queryableIndex); + closer.register(segmentAndCloseable.rhs); + } + + mergedFile = indexMerger.mergeQueryableIndex( + indexes, + schema.getGranularitySpec().isRollup(), + schema.getAggregators(), + mergedTarget, + tuningConfig.getIndexSpec() + ); + } + catch (Throwable t) { + throw closer.rethrow(t); + } + finally { + closer.close(); + } // Retry pushing segments because uploading to deep storage might fail especially for cloud storage types final DataSegment segment = RetryUtils.retry( @@ -947,14 +959,7 @@ public class AppenderatorImpl implements Appenderator if (cache != null) { cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant)); } - try { - hydrant.getSegment().close(); - } - catch (IOException e) { - log.makeAlert(e, "Failed to explicitly close segment[%s]", schema.getDataSource()) - .addData("identifier", hydrant.getSegment().getIdentifier()) - .emit(); - } + hydrant.swapSegment(null); } if (removeOnDiskData) { @@ -1040,7 +1045,7 @@ public class AppenderatorImpl implements Appenderator indexToPersist.swapSegment( new QueryableIndexSegment( - indexToPersist.getSegment().getIdentifier(), + indexToPersist.getSegmentIdentifier(), indexIO.loadIndex(persistedFile) ) ); diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/io/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index 0caa5631619..3905adbbcba 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -309,6 +309,6 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker public static String makeHydrantCacheIdentifier(FireHydrant input) { - return input.getSegment().getIdentifier() + "_" + input.getCount(); + return input.getSegmentIdentifier() + "_" + input.getCount(); } } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 3fe234db39a..40fe89057ed 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -49,6 +49,7 @@ import io.druid.java.util.common.Pair; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.concurrent.ScheduledExecutors; import io.druid.java.util.common.granularity.Granularity; +import io.druid.java.util.common.io.Closer; import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactoryConglomerate; @@ -79,6 +80,7 @@ import org.joda.time.Duration; import org.joda.time.Interval; import org.joda.time.Period; +import java.io.Closeable; import java.io.File; import java.io.FilenameFilter; import java.io.IOException; @@ -408,21 +410,33 @@ public class RealtimePlumber implements Plumber final long mergeThreadCpuTime = VMUtils.safeGetThreadCpuTime(); mergeStopwatch = Stopwatch.createStarted(); + final File mergedFile; List indexes = Lists.newArrayList(); - for (FireHydrant fireHydrant : sink) { - Segment segment = fireHydrant.getSegment(); - final QueryableIndex queryableIndex = segment.asQueryableIndex(); - log.info("Adding hydrant[%s]", fireHydrant); - indexes.add(queryableIndex); - } + Closer closer = Closer.create(); + try { + for (FireHydrant fireHydrant : sink) { + Pair segmentAndCloseable = fireHydrant.getAndIncrementSegment(); + final QueryableIndex queryableIndex = segmentAndCloseable.lhs.asQueryableIndex(); + log.info("Adding hydrant[%s]", fireHydrant); + indexes.add(queryableIndex); + closer.register(segmentAndCloseable.rhs); + } - final File mergedFile = indexMerger.mergeQueryableIndex( - indexes, - schema.getGranularitySpec().isRollup(), - schema.getAggregators(), - mergedTarget, - config.getIndexSpec() - ); + + mergedFile = indexMerger.mergeQueryableIndex( + indexes, + schema.getGranularitySpec().isRollup(), + schema.getAggregators(), + mergedTarget, + config.getIndexSpec() + ); + } + catch (Throwable t) { + throw closer.rethrow(t); + } + finally { + closer.close(); + } // emit merge metrics before publishing segment metrics.incrementMergeCpuTime(VMUtils.safeGetThreadCpuTime() - mergeThreadCpuTime); @@ -857,7 +871,7 @@ public class RealtimePlumber implements Plumber ); for (FireHydrant hydrant : sink) { cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant)); - hydrant.getSegment().close(); + hydrant.swapSegment(null); } synchronized (handoffCondition) { handoffCondition.notifyAll(); @@ -936,7 +950,7 @@ public class RealtimePlumber implements Plumber indexToPersist.swapSegment( new QueryableIndexSegment( - indexToPersist.getSegment().getIdentifier(), + indexToPersist.getSegmentIdentifier(), indexIO.loadIndex(persistedFile) ) ); diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java index 3341e23bc4e..5ac9b05cd6d 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java @@ -32,6 +32,7 @@ import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.QueryableIndex; +import io.druid.segment.ReferenceCountingSegment; import io.druid.segment.column.ColumnCapabilitiesImpl; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; @@ -111,7 +112,13 @@ public class Sink implements Iterable throw new ISE("hydrant[%s] not the right count[%s]", hydrant, i); } maxCount = hydrant.getCount(); - numRowsExcludingCurrIndex.addAndGet(hydrant.getSegment().asQueryableIndex().getNumRows()); + ReferenceCountingSegment segment = hydrant.getIncrementedSegment(); + try { + numRowsExcludingCurrIndex.addAndGet(segment.asQueryableIndex().getNumRows()); + } + finally { + segment.decrement(); + } } this.hydrants.addAll(hydrants); @@ -272,10 +279,16 @@ public class Sink implements Iterable Map oldCapabilities; if (lastHydrant.hasSwapped()) { oldCapabilities = Maps.newHashMap(); - QueryableIndex oldIndex = lastHydrant.getSegment().asQueryableIndex(); - for (String dim : oldIndex.getAvailableDimensions()) { - dimOrder.add(dim); - oldCapabilities.put(dim, (ColumnCapabilitiesImpl) oldIndex.getColumn(dim).getCapabilities()); + ReferenceCountingSegment segment = lastHydrant.getIncrementedSegment(); + try { + QueryableIndex oldIndex = segment.asQueryableIndex(); + for (String dim : oldIndex.getAvailableDimensions()) { + dimOrder.add(dim); + oldCapabilities.put(dim, (ColumnCapabilitiesImpl) oldIndex.getColumn(dim).getCapabilities()); + } + } + finally { + segment.decrement(); } } else { IncrementalIndex oldIndex = lastHydrant.getIndex(); diff --git a/server/src/main/java/io/druid/server/SegmentManager.java b/server/src/main/java/io/druid/server/SegmentManager.java index b27bbe8752f..3a7ba3cdbfd 100644 --- a/server/src/main/java/io/druid/server/SegmentManager.java +++ b/server/src/main/java/io/druid/server/SegmentManager.java @@ -34,7 +34,6 @@ import io.druid.timeline.partition.PartitionChunk; import io.druid.timeline.partition.PartitionHolder; import javax.annotation.Nullable; -import java.io.IOException; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; @@ -236,16 +235,8 @@ public class SegmentManager if (oldQueryable != null) { dataSourceState.removeSegment(segment); - try { - log.info("Attempting to close segment %s", segment.getIdentifier()); - oldQueryable.close(); - } - catch (IOException e) { - log.makeAlert(e, "Exception closing segment") - .addData("dataSource", dataSourceName) - .addData("segmentId", segment.getIdentifier()) - .emit(); - } + log.info("Attempting to close segment %s", segment.getIdentifier()); + oldQueryable.close(); } else { log.info( "Told to delete a queryable on dataSource[%s] for interval[%s] and version [%s] that I don't have.", diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index 414dd897738..a9a9c1ab48e 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -46,6 +46,7 @@ import io.druid.query.SegmentDescriptor; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.segment.QueryableIndex; +import io.druid.segment.ReferenceCountingSegment; import io.druid.segment.TestHelper; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; @@ -425,17 +426,17 @@ public class RealtimePlumberSchoolTest Assert.assertEquals(0, hydrants.get(0).getCount()); Assert.assertEquals( expectedInterval, - hydrants.get(0).getSegment().getDataInterval() + hydrants.get(0).getSegmentDataInterval() ); Assert.assertEquals(2, hydrants.get(1).getCount()); Assert.assertEquals( expectedInterval, - hydrants.get(1).getSegment().getDataInterval() + hydrants.get(1).getSegmentDataInterval() ); Assert.assertEquals(4, hydrants.get(2).getCount()); Assert.assertEquals( expectedInterval, - hydrants.get(2).getSegment().getDataInterval() + hydrants.get(2).getSegmentDataInterval() ); /* Delete all the hydrants and reload, no sink should be created */ @@ -563,9 +564,15 @@ public class RealtimePlumberSchoolTest for (int i = 0; i < hydrants.size(); i++) { hydrant = hydrants.get(i); - qindex = hydrant.getSegment().asQueryableIndex(); - Assert.assertEquals(i, hydrant.getCount()); - Assert.assertEquals(expectedDims.get(i), ImmutableList.copyOf(qindex.getAvailableDimensions())); + ReferenceCountingSegment segment = hydrant.getIncrementedSegment(); + try { + qindex = segment.asQueryableIndex(); + Assert.assertEquals(i, hydrant.getCount()); + Assert.assertEquals(expectedDims.get(i), ImmutableList.copyOf(qindex.getAvailableDimensions())); + } + finally { + segment.decrement(); + } } }