Refactoring of ReferenceCountingSegment and FireHydrant (#4154)

* Refactoring of ReferenceCountingSegment and FireHydrant

* Address comment

* Fix FireHydrant.closeSegment()

* Address comment

* Added comments to ReferenceCountingSegment
This commit is contained in:
Roman Leventov 2017-09-12 14:28:35 -05:00 committed by Himanshu
parent c0be050242
commit 832cc293ef
10 changed files with 248 additions and 240 deletions

View File

@ -19,12 +19,10 @@
package io.druid.query;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.segment.ReferenceCountingSegment;
import java.io.Closeable;
import java.util.Map;
/**
@ -49,16 +47,20 @@ public class ReferenceCountingSegmentQueryRunner<T> implements QueryRunner<T>
@Override
public Sequence<T> run(final QueryPlus<T> queryPlus, Map<String, Object> responseContext)
{
final Closeable closeable = adapter.increment();
if (closeable != null) {
if (adapter.increment()) {
try {
final Sequence<T> baseSequence = factory.createRunner(adapter).run(queryPlus, responseContext);
return Sequences.withBaggage(baseSequence, closeable);
return Sequences.withBaggage(baseSequence, adapter.decrementOnceCloseable());
}
catch (RuntimeException e) {
CloseQuietly.close(closeable);
throw e;
catch (Throwable t) {
try {
adapter.decrement();
}
catch (Exception e) {
t.addSuppressed(e);
}
throw t;
}
} else {
// Segment was closed before we had a chance to increment the reference count

View File

@ -19,23 +19,48 @@
package io.druid.segment;
import com.google.common.base.Preconditions;
import com.metamx.emitter.EmittingLogger;
import org.joda.time.Interval;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* ReferenceCountingSegment allows to call {@link #close()} before some other "users", which called {@link
* #increment()}, has not called {@link #decrement()} yet, and the wrapped {@link Segment} won't be actually closed
* until that. So ReferenceCountingSegment implements something like automatic reference count-based resource
* management.
*/
public class ReferenceCountingSegment extends AbstractSegment
{
private static final EmittingLogger log = new EmittingLogger(ReferenceCountingSegment.class);
private final Segment baseSegment;
private final Object lock = new Object();
private volatile int numReferences = 0;
private volatile boolean isClosed = false;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Phaser referents = new Phaser(1)
{
@Override
protected boolean onAdvance(int phase, int registeredParties)
{
Preconditions.checkState(registeredParties == 0);
// Ensure that onAdvance() doesn't throw exception, otherwise termination won't happen
try {
baseSegment.close();
}
catch (Exception e) {
try {
log.error(e, "Exception while closing segment[%s]", baseSegment.getIdentifier());
}
catch (Exception e2) {
// ignore
}
}
// Always terminate.
return true;
}
};
public ReferenceCountingSegment(Segment baseSegment)
{
@ -44,141 +69,78 @@ public class ReferenceCountingSegment extends AbstractSegment
public Segment getBaseSegment()
{
synchronized (lock) {
if (isClosed) {
return null;
}
return baseSegment;
}
return !isClosed() ? baseSegment : null;
}
public int getNumReferences()
{
return numReferences;
return Math.max(referents.getRegisteredParties() - 1, 0);
}
public boolean isClosed()
{
return isClosed;
return referents.isTerminated();
}
@Override
public String getIdentifier()
{
synchronized (lock) {
if (isClosed) {
return null;
}
return baseSegment.getIdentifier();
}
return !isClosed() ? baseSegment.getIdentifier() : null;
}
@Override
public Interval getDataInterval()
{
synchronized (lock) {
if (isClosed) {
return null;
}
return baseSegment.getDataInterval();
}
return !isClosed() ? baseSegment.getDataInterval() : null;
}
@Override
public QueryableIndex asQueryableIndex()
{
synchronized (lock) {
if (isClosed) {
return null;
}
return baseSegment.asQueryableIndex();
}
return !isClosed() ? baseSegment.asQueryableIndex() : null;
}
@Override
public StorageAdapter asStorageAdapter()
{
synchronized (lock) {
if (isClosed) {
return null;
}
return baseSegment.asStorageAdapter();
}
return !isClosed() ? baseSegment.asStorageAdapter() : null;
}
@Override
public void close() throws IOException
public void close()
{
synchronized (lock) {
if (isClosed) {
log.info("Failed to close, %s is closed already", baseSegment.getIdentifier());
return;
}
if (closed.compareAndSet(false, true)) {
referents.arriveAndDeregister();
} else {
log.warn("close() is called more than once on ReferenceCountingSegment");
}
}
if (numReferences > 0) {
log.info("%d references to %s still exist. Decrementing.", numReferences, baseSegment.getIdentifier());
public boolean increment()
{
// Negative return from referents.register() means the Phaser is terminated.
return referents.register() >= 0;
}
/**
* Returns a {@link Closeable} which action is to call {@link #decrement()} only once. If close() is called on the
* returned Closeable object for the second time, it won't call {@link #decrement()} again.
*/
public Closeable decrementOnceCloseable()
{
AtomicBoolean decremented = new AtomicBoolean(false);
return () -> {
if (decremented.compareAndSet(false, true)) {
decrement();
} else {
log.info("Closing %s", baseSegment.getIdentifier());
innerClose();
log.warn("close() is called more than once on ReferenceCountingSegment.decrementOnceCloseable()");
}
}
};
}
public Closeable increment()
public void decrement()
{
synchronized (lock) {
if (isClosed) {
return null;
}
numReferences++;
final AtomicBoolean decrementOnce = new AtomicBoolean(false);
return new Closeable()
{
@Override
public void close() throws IOException
{
if (decrementOnce.compareAndSet(false, true)) {
decrement();
}
}
};
}
}
private void decrement()
{
synchronized (lock) {
if (isClosed) {
return;
}
if (--numReferences < 0) {
try {
innerClose();
}
catch (Exception e) {
log.error("Unable to close queryable index %s", getIdentifier());
}
}
}
}
private void innerClose() throws IOException
{
synchronized (lock) {
log.info("Closing %s, numReferences: %d", baseSegment.getIdentifier(), numReferences);
isClosed = true;
baseSegment.close();
}
referents.arriveAndDeregister();
}
@Override

View File

@ -83,56 +83,47 @@ public class ReferenceCountingSegmentTest
public void testMultipleClose() throws Exception
{
Assert.assertFalse(segment.isClosed());
final Closeable closeable = segment.increment();
Assert.assertTrue(segment.getNumReferences() == 1);
Assert.assertTrue(segment.increment());
Assert.assertEquals(1, segment.getNumReferences());
Closeable closeable = segment.decrementOnceCloseable();
closeable.close();
closeable.close();
exec.submit(
new Runnable()
{
@Override
public void run()
{
try {
closeable.close();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
() -> {
try {
closeable.close();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
);
Assert.assertTrue(segment.getNumReferences() == 0);
).get();
Assert.assertEquals(0, segment.getNumReferences());
Assert.assertFalse(segment.isClosed());
segment.close();
segment.close();
exec.submit(
new Runnable()
{
@Override
public void run()
{
try {
segment.close();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
() -> {
try {
segment.close();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
);
).get();
Assert.assertTrue(segment.getNumReferences() == 0);
Assert.assertEquals(0, segment.getNumReferences());
Assert.assertTrue(segment.isClosed());
segment.increment();
segment.increment();
segment.increment();
Assert.assertTrue(segment.getNumReferences() == 0);
Assert.assertEquals(0, segment.getNumReferences());
segment.close();
Assert.assertTrue(segment.getNumReferences() == 0);
Assert.assertEquals(0, segment.getNumReferences());
}
}

View File

@ -19,46 +19,39 @@
package io.druid.segment.realtime;
import com.google.common.base.Throwables;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.ReferenceCountingSegment;
import io.druid.segment.Segment;
import io.druid.segment.incremental.IncrementalIndex;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
/**
*/
public class FireHydrant
{
private final int count;
private final Object swapLock = new Object();
private final AtomicReference<ReferenceCountingSegment> adapter;
private volatile IncrementalIndex index;
private volatile ReferenceCountingSegment adapter;
public FireHydrant(
IncrementalIndex index,
int count,
String segmentIdentifier
)
public FireHydrant(IncrementalIndex index, int count, String segmentIdentifier)
{
this.index = index;
this.adapter = new ReferenceCountingSegment(new IncrementalIndexSegment(index, segmentIdentifier));
this.adapter = new AtomicReference<>(
new ReferenceCountingSegment(new IncrementalIndexSegment(index, segmentIdentifier))
);
this.count = count;
}
public FireHydrant(
Segment adapter,
int count
)
public FireHydrant(Segment adapter, int count)
{
this.index = null;
this.adapter = new ReferenceCountingSegment(adapter);
this.adapter = new AtomicReference<>(new ReferenceCountingSegment(adapter));
this.count = count;
}
@ -67,9 +60,35 @@ public class FireHydrant
return index;
}
public Segment getSegment()
public String getSegmentIdentifier()
{
return adapter;
return adapter.get().getIdentifier();
}
public Interval getSegmentDataInterval()
{
return adapter.get().getDataInterval();
}
public ReferenceCountingSegment getIncrementedSegment()
{
ReferenceCountingSegment segment = adapter.get();
while (true) {
if (segment.increment()) {
return segment;
}
// segment.increment() returned false, means it is closed. Since close() in swapSegment() happens after segment
// swap, the new segment should already be visible.
ReferenceCountingSegment newSegment = adapter.get();
if (segment == newSegment) {
throw new ISE("segment.close() is called somewhere outside FireHydrant.swapSegment()");
}
if (newSegment == null) {
throw new ISE("FireHydrant was 'closed' by swapping segment to null while acquiring a segment");
}
segment = newSegment;
// Spin loop.
}
}
public int getCount()
@ -82,37 +101,41 @@ public class FireHydrant
return index == null;
}
public void swapSegment(Segment newAdapter)
public void swapSegment(@Nullable Segment newSegment)
{
synchronized (swapLock) {
if (adapter != null && newAdapter != null && !newAdapter.getIdentifier().equals(adapter.getIdentifier())) {
while (true) {
ReferenceCountingSegment currentSegment = adapter.get();
if (currentSegment == null && newSegment == null) {
return;
}
if (currentSegment != null && newSegment != null &&
!newSegment.getIdentifier().equals(currentSegment.getIdentifier())) {
// Sanity check: identifier should not change
throw new ISE(
"WTF?! Cannot swap identifier[%s] -> [%s]!",
adapter.getIdentifier(),
newAdapter.getIdentifier()
currentSegment.getIdentifier(),
newSegment.getIdentifier()
);
}
if (this.adapter != null) {
try {
this.adapter.close();
}
catch (IOException e) {
throw Throwables.propagate(e);
}
if (currentSegment == newSegment) {
throw new ISE("Cannot swap to the same segment");
}
ReferenceCountingSegment newReferenceCountingSegment =
newSegment != null ? new ReferenceCountingSegment(newSegment) : null;
if (adapter.compareAndSet(currentSegment, newReferenceCountingSegment)) {
if (currentSegment != null) {
currentSegment.close();
}
index = null;
return;
}
this.adapter = new ReferenceCountingSegment(newAdapter);
this.index = null;
}
}
public Pair<Segment, Closeable> getAndIncrementSegment()
{
// Prevent swapping of index before increment is called
synchronized (swapLock) {
Closeable closeable = adapter.increment();
return new Pair<Segment, Closeable>(adapter, closeable);
}
ReferenceCountingSegment segment = getIncrementedSegment();
return new Pair<>(segment, segment.decrementOnceCloseable());
}
@Override
@ -120,7 +143,7 @@ public class FireHydrant
{
return "FireHydrant{" +
"index=" + index +
", queryable=" + adapter.getIdentifier() +
", queryable=" + adapter.get().getIdentifier() +
", count=" + count +
'}';
}

View File

@ -51,6 +51,7 @@ import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.RetryUtils;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.io.Closer;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactoryConglomerate;
@ -75,6 +76,7 @@ import org.apache.commons.io.FileUtils;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
@ -576,22 +578,32 @@ public class AppenderatorImpl implements Appenderator
throw new ISE("Merged target[%s] exists after removing?!", mergedTarget);
}
List<QueryableIndex> indexes = Lists.newArrayList();
for (FireHydrant fireHydrant : sink) {
Segment segment = fireHydrant.getSegment();
final QueryableIndex queryableIndex = segment.asQueryableIndex();
log.info("Adding hydrant[%s]", fireHydrant);
indexes.add(queryableIndex);
}
final File mergedFile;
mergedFile = indexMerger.mergeQueryableIndex(
indexes,
schema.getGranularitySpec().isRollup(),
schema.getAggregators(),
mergedTarget,
tuningConfig.getIndexSpec()
);
List<QueryableIndex> indexes = Lists.newArrayList();
Closer closer = Closer.create();
try {
for (FireHydrant fireHydrant : sink) {
Pair<Segment, Closeable> segmentAndCloseable = fireHydrant.getAndIncrementSegment();
final QueryableIndex queryableIndex = segmentAndCloseable.lhs.asQueryableIndex();
log.info("Adding hydrant[%s]", fireHydrant);
indexes.add(queryableIndex);
closer.register(segmentAndCloseable.rhs);
}
mergedFile = indexMerger.mergeQueryableIndex(
indexes,
schema.getGranularitySpec().isRollup(),
schema.getAggregators(),
mergedTarget,
tuningConfig.getIndexSpec()
);
}
catch (Throwable t) {
throw closer.rethrow(t);
}
finally {
closer.close();
}
// Retry pushing segments because uploading to deep storage might fail especially for cloud storage types
final DataSegment segment = RetryUtils.retry(
@ -947,14 +959,7 @@ public class AppenderatorImpl implements Appenderator
if (cache != null) {
cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant));
}
try {
hydrant.getSegment().close();
}
catch (IOException e) {
log.makeAlert(e, "Failed to explicitly close segment[%s]", schema.getDataSource())
.addData("identifier", hydrant.getSegment().getIdentifier())
.emit();
}
hydrant.swapSegment(null);
}
if (removeOnDiskData) {
@ -1040,7 +1045,7 @@ public class AppenderatorImpl implements Appenderator
indexToPersist.swapSegment(
new QueryableIndexSegment(
indexToPersist.getSegment().getIdentifier(),
indexToPersist.getSegmentIdentifier(),
indexIO.loadIndex(persistedFile)
)
);

View File

@ -309,6 +309,6 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
public static String makeHydrantCacheIdentifier(FireHydrant input)
{
return input.getSegment().getIdentifier() + "_" + input.getCount();
return input.getSegmentIdentifier() + "_" + input.getCount();
}
}

View File

@ -49,6 +49,7 @@ import io.druid.java.util.common.Pair;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.ScheduledExecutors;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.java.util.common.io.Closer;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactoryConglomerate;
@ -79,6 +80,7 @@ import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.Period;
import java.io.Closeable;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
@ -408,21 +410,33 @@ public class RealtimePlumber implements Plumber
final long mergeThreadCpuTime = VMUtils.safeGetThreadCpuTime();
mergeStopwatch = Stopwatch.createStarted();
final File mergedFile;
List<QueryableIndex> indexes = Lists.newArrayList();
for (FireHydrant fireHydrant : sink) {
Segment segment = fireHydrant.getSegment();
final QueryableIndex queryableIndex = segment.asQueryableIndex();
log.info("Adding hydrant[%s]", fireHydrant);
indexes.add(queryableIndex);
}
Closer closer = Closer.create();
try {
for (FireHydrant fireHydrant : sink) {
Pair<Segment, Closeable> segmentAndCloseable = fireHydrant.getAndIncrementSegment();
final QueryableIndex queryableIndex = segmentAndCloseable.lhs.asQueryableIndex();
log.info("Adding hydrant[%s]", fireHydrant);
indexes.add(queryableIndex);
closer.register(segmentAndCloseable.rhs);
}
final File mergedFile = indexMerger.mergeQueryableIndex(
indexes,
schema.getGranularitySpec().isRollup(),
schema.getAggregators(),
mergedTarget,
config.getIndexSpec()
);
mergedFile = indexMerger.mergeQueryableIndex(
indexes,
schema.getGranularitySpec().isRollup(),
schema.getAggregators(),
mergedTarget,
config.getIndexSpec()
);
}
catch (Throwable t) {
throw closer.rethrow(t);
}
finally {
closer.close();
}
// emit merge metrics before publishing segment
metrics.incrementMergeCpuTime(VMUtils.safeGetThreadCpuTime() - mergeThreadCpuTime);
@ -857,7 +871,7 @@ public class RealtimePlumber implements Plumber
);
for (FireHydrant hydrant : sink) {
cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant));
hydrant.getSegment().close();
hydrant.swapSegment(null);
}
synchronized (handoffCondition) {
handoffCondition.notifyAll();
@ -936,7 +950,7 @@ public class RealtimePlumber implements Plumber
indexToPersist.swapSegment(
new QueryableIndexSegment(
indexToPersist.getSegment().getIdentifier(),
indexToPersist.getSegmentIdentifier(),
indexIO.loadIndex(persistedFile)
)
);

View File

@ -32,6 +32,7 @@ import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.QueryableIndex;
import io.druid.segment.ReferenceCountingSegment;
import io.druid.segment.column.ColumnCapabilitiesImpl;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
@ -111,7 +112,13 @@ public class Sink implements Iterable<FireHydrant>
throw new ISE("hydrant[%s] not the right count[%s]", hydrant, i);
}
maxCount = hydrant.getCount();
numRowsExcludingCurrIndex.addAndGet(hydrant.getSegment().asQueryableIndex().getNumRows());
ReferenceCountingSegment segment = hydrant.getIncrementedSegment();
try {
numRowsExcludingCurrIndex.addAndGet(segment.asQueryableIndex().getNumRows());
}
finally {
segment.decrement();
}
}
this.hydrants.addAll(hydrants);
@ -272,10 +279,16 @@ public class Sink implements Iterable<FireHydrant>
Map<String, ColumnCapabilitiesImpl> oldCapabilities;
if (lastHydrant.hasSwapped()) {
oldCapabilities = Maps.newHashMap();
QueryableIndex oldIndex = lastHydrant.getSegment().asQueryableIndex();
for (String dim : oldIndex.getAvailableDimensions()) {
dimOrder.add(dim);
oldCapabilities.put(dim, (ColumnCapabilitiesImpl) oldIndex.getColumn(dim).getCapabilities());
ReferenceCountingSegment segment = lastHydrant.getIncrementedSegment();
try {
QueryableIndex oldIndex = segment.asQueryableIndex();
for (String dim : oldIndex.getAvailableDimensions()) {
dimOrder.add(dim);
oldCapabilities.put(dim, (ColumnCapabilitiesImpl) oldIndex.getColumn(dim).getCapabilities());
}
}
finally {
segment.decrement();
}
} else {
IncrementalIndex oldIndex = lastHydrant.getIndex();

View File

@ -34,7 +34,6 @@ import io.druid.timeline.partition.PartitionChunk;
import io.druid.timeline.partition.PartitionHolder;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
@ -236,16 +235,8 @@ public class SegmentManager
if (oldQueryable != null) {
dataSourceState.removeSegment(segment);
try {
log.info("Attempting to close segment %s", segment.getIdentifier());
oldQueryable.close();
}
catch (IOException e) {
log.makeAlert(e, "Exception closing segment")
.addData("dataSource", dataSourceName)
.addData("segmentId", segment.getIdentifier())
.emit();
}
log.info("Attempting to close segment %s", segment.getIdentifier());
oldQueryable.close();
} else {
log.info(
"Told to delete a queryable on dataSource[%s] for interval[%s] and version [%s] that I don't have.",

View File

@ -46,6 +46,7 @@ import io.druid.query.SegmentDescriptor;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.QueryableIndex;
import io.druid.segment.ReferenceCountingSegment;
import io.druid.segment.TestHelper;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
@ -425,17 +426,17 @@ public class RealtimePlumberSchoolTest
Assert.assertEquals(0, hydrants.get(0).getCount());
Assert.assertEquals(
expectedInterval,
hydrants.get(0).getSegment().getDataInterval()
hydrants.get(0).getSegmentDataInterval()
);
Assert.assertEquals(2, hydrants.get(1).getCount());
Assert.assertEquals(
expectedInterval,
hydrants.get(1).getSegment().getDataInterval()
hydrants.get(1).getSegmentDataInterval()
);
Assert.assertEquals(4, hydrants.get(2).getCount());
Assert.assertEquals(
expectedInterval,
hydrants.get(2).getSegment().getDataInterval()
hydrants.get(2).getSegmentDataInterval()
);
/* Delete all the hydrants and reload, no sink should be created */
@ -563,9 +564,15 @@ public class RealtimePlumberSchoolTest
for (int i = 0; i < hydrants.size(); i++) {
hydrant = hydrants.get(i);
qindex = hydrant.getSegment().asQueryableIndex();
Assert.assertEquals(i, hydrant.getCount());
Assert.assertEquals(expectedDims.get(i), ImmutableList.copyOf(qindex.getAvailableDimensions()));
ReferenceCountingSegment segment = hydrant.getIncrementedSegment();
try {
qindex = segment.asQueryableIndex();
Assert.assertEquals(i, hydrant.getCount());
Assert.assertEquals(expectedDims.get(i), ImmutableList.copyOf(qindex.getAvailableDimensions()));
}
finally {
segment.decrement();
}
}
}