Fix two concurrency issues with segment fetching. (#14042)

* Fix two concurrency issues with segment fetching.

1) SegmentLocalCacheManager: Fix a concurrency issue where certain directory
   cleanup happened outside of directoryWriteRemoveLock. This created the
   possibility that segments would be deleted by one thread, while being
   actively downloaded by another thread.

2) TaskDataSegmentProcessor (MSQ): Fix a concurrency issue when two stages
   in the same process both use the same segment. For example: a self-join
   using distributed sort-merge. Prior to this change, the two stages could
   delete each others' segments.

3) ReferenceCountingResourceHolder: increment() returns a new ResourceHolder,
   rather than a Releaser. This allows it to be passed to callers without them
   having to hold on to both the original ResourceHolder *and* a Releaser.

4) Simplify various interfaces and implementations by using ResourceHolder
   instead of Pair and instead of split-up fields.

* Add test.

* Fix style.

* Remove Releaser.

* Updates from master.

* Add some GuardedBys.

* Use the correct GuardedBy.

* Adjustments.
This commit is contained in:
Gian Merlino 2023-04-25 20:49:27 -07:00 committed by GitHub
parent 2dfb693d4c
commit 752475b799
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 730 additions and 206 deletions

View File

@ -19,14 +19,15 @@
package org.apache.druid.msq.exec;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.querykit.DataSegmentProvider;
import org.apache.druid.msq.querykit.LazyResourceHolder;
import org.apache.druid.msq.rpc.CoordinatorServiceClient;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.QueryableIndex;
@ -38,9 +39,13 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.utils.CloseableUtils;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
/**
* Production implementation of {@link DataSegmentProvider} using Coordinator APIs.
@ -50,6 +55,7 @@ public class TaskDataSegmentProvider implements DataSegmentProvider
private final CoordinatorServiceClient coordinatorClient;
private final SegmentCacheManager segmentCacheManager;
private final IndexIO indexIO;
private final ConcurrentHashMap<SegmentId, SegmentHolder> holders;
public TaskDataSegmentProvider(
CoordinatorServiceClient coordinatorClient,
@ -60,56 +66,162 @@ public class TaskDataSegmentProvider implements DataSegmentProvider
this.coordinatorClient = coordinatorClient;
this.segmentCacheManager = segmentCacheManager;
this.indexIO = indexIO;
this.holders = new ConcurrentHashMap<>();
}
@Override
public LazyResourceHolder<Segment> fetchSegment(
public Supplier<ResourceHolder<Segment>> fetchSegment(
final SegmentId segmentId,
final ChannelCounters channelCounters
)
{
// Returns Supplier<ResourceHolder> instead of ResourceHolder, so the Coordinator calls and segment downloads happen
// in processing threads, rather than the main thread. (They happen when fetchSegmentInternal is called.)
return () -> {
ResourceHolder<Segment> holder = null;
while (holder == null) {
holder = holders.computeIfAbsent(
segmentId,
k -> new SegmentHolder(
() -> fetchSegmentInternal(segmentId, channelCounters),
() -> holders.remove(segmentId)
)
).get();
}
return holder;
};
}
/**
* Helper used by {@link #fetchSegment(SegmentId, ChannelCounters)}. Does the actual fetching of a segment, once it
* is determined that we definitely need to go out and get one.
*/
private ReferenceCountingResourceHolder<Segment> fetchSegmentInternal(
final SegmentId segmentId,
final ChannelCounters channelCounters
)
{
final DataSegment dataSegment;
try {
// Use LazyResourceHolder so Coordinator call and segment downloads happen in processing threads,
// rather than the main thread.
return new LazyResourceHolder<>(
() -> {
final DataSegment dataSegment;
try {
dataSegment = FutureUtils.get(
coordinatorClient.fetchUsedSegment(
segmentId.getDataSource(),
segmentId.toString()
),
true
);
}
catch (InterruptedException | ExecutionException e) {
throw new RE(e, "Failed to fetch segment details from Coordinator for [%s]", segmentId);
}
final Closer closer = Closer.create();
try {
final File segmentDir = segmentCacheManager.getSegmentFiles(dataSegment);
closer.register(() -> FileUtils.deleteDirectory(segmentDir));
final QueryableIndex index = indexIO.loadIndex(segmentDir);
final int numRows = index.getNumRows();
final long size = dataSegment.getSize();
closer.register(() -> channelCounters.addFile(numRows, size));
closer.register(index);
return Pair.of(new QueryableIndexSegment(index, dataSegment.getId()), closer);
}
catch (IOException | SegmentLoadingException e) {
throw CloseableUtils.closeInCatch(
new RE(e, "Failed to download segment [%s]", segmentId),
closer
);
}
}
dataSegment = FutureUtils.get(
coordinatorClient.fetchUsedSegment(
segmentId.getDataSource(),
segmentId.toString()
),
true
);
}
catch (Exception e) {
throw new RuntimeException(e);
catch (InterruptedException | ExecutionException e) {
throw new RE(e, "Failed to fetch segment details from Coordinator for [%s]", segmentId);
}
final Closer closer = Closer.create();
try {
if (!segmentCacheManager.reserve(dataSegment)) {
throw new ISE("Could not reserve location for segment [%s]", segmentId);
}
closer.register(() -> segmentCacheManager.cleanup(dataSegment));
final File segmentDir = segmentCacheManager.getSegmentFiles(dataSegment);
final QueryableIndex index = closer.register(indexIO.loadIndex(segmentDir));
final QueryableIndexSegment segment = new QueryableIndexSegment(index, dataSegment.getId());
final int numRows = index.getNumRows();
final long size = dataSegment.getSize();
closer.register(() -> channelCounters.addFile(numRows, size));
return new ReferenceCountingResourceHolder<>(segment, closer);
}
catch (IOException | SegmentLoadingException e) {
throw CloseableUtils.closeInCatch(
new RE(e, "Failed to download segment [%s]", segmentId),
closer
);
}
}
private static class SegmentHolder implements Supplier<ResourceHolder<Segment>>
{
private final Supplier<ResourceHolder<Segment>> holderSupplier;
private final Closeable cleanupFn;
@GuardedBy("this")
private ReferenceCountingResourceHolder<Segment> holder;
@GuardedBy("this")
private boolean closing;
@GuardedBy("this")
private boolean closed;
public SegmentHolder(Supplier<ResourceHolder<Segment>> holderSupplier, Closeable cleanupFn)
{
this.holderSupplier = holderSupplier;
this.cleanupFn = cleanupFn;
}
@Override
@Nullable
public ResourceHolder<Segment> get()
{
synchronized (this) {
if (closing) {
// Wait until the holder is closed.
while (!closed) {
try {
wait();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
// Then, return null so "fetchSegment" will try again.
return null;
} else if (holder == null) {
final ResourceHolder<Segment> segmentHolder = holderSupplier.get();
holder = new ReferenceCountingResourceHolder<>(
segmentHolder.get(),
() -> {
synchronized (this) {
CloseableUtils.closeAll(
() -> {
// synchronized block not strictly needed here, but errorprone needs it since it doesn't
// understand the lambda is immediately called. See https://errorprone.info/bugpattern/GuardedBy
synchronized (this) {
closing = true;
}
},
segmentHolder,
cleanupFn, // removes this holder from the "holders" map
() -> {
// synchronized block not strictly needed here, but errorprone needs it since it doesn't
// understand the lambda is immediately called. See https://errorprone.info/bugpattern/GuardedBy
synchronized (this) {
closed = true;
SegmentHolder.this.notifyAll();
}
}
);
}
}
);
final ResourceHolder<Segment> retVal = holder.increment();
// Store already-closed holder, so it disappears when the last reference is closed.
holder.close();
return retVal;
} else {
try {
return holder.increment();
}
catch (IllegalStateException e) {
// Possible race: holder is in the process of closing. (This is the only reason "increment" can throw ISE.)
// Return null so "fetchSegment" will try again.
return null;
}
}
}
}
}
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.msq.input.external;
import com.google.common.collect.Iterators;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
@ -32,7 +33,6 @@ import org.apache.druid.data.input.impl.InlineInputSource;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
@ -48,7 +48,6 @@ import org.apache.druid.msq.input.NilInputSource;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.input.ReadableInputs;
import org.apache.druid.msq.input.table.SegmentWithDescriptor;
import org.apache.druid.msq.querykit.LazyResourceHolder;
import org.apache.druid.msq.util.DimensionSchemaUtils;
import org.apache.druid.segment.RowAdapters;
import org.apache.druid.segment.RowBasedSegment;
@ -234,7 +233,7 @@ public class ExternalInputSliceReader implements InputSliceReader
);
return new SegmentWithDescriptor(
new LazyResourceHolder<>(() -> Pair.of(segment, () -> {})),
() -> ResourceHolder.fromCloseable(segment),
segmentId.toDescriptor()
);
}

View File

@ -20,15 +20,14 @@
package org.apache.druid.msq.input.inline;
import com.google.common.collect.Iterables;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.msq.counters.CounterTracker;
import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.input.InputSliceReader;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.input.ReadableInputs;
import org.apache.druid.msq.input.table.SegmentWithDescriptor;
import org.apache.druid.msq.querykit.LazyResourceHolder;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.SegmentWrangler;
@ -72,7 +71,7 @@ public class InlineInputSliceReader implements InputSliceReader
segmentWrangler.getSegmentsForIntervals(dataSource, Intervals.ONLY_ETERNITY),
segment -> ReadableInput.segment(
new SegmentWithDescriptor(
new LazyResourceHolder<>(() -> Pair.of(segment, segment)),
() -> ResourceHolder.fromCloseable(segment),
DUMMY_SEGMENT_DESCRIPTOR
)
)

View File

@ -20,9 +20,9 @@
package org.apache.druid.msq.input.lookup;
import com.google.common.collect.Iterators;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.counters.CounterTracker;
import org.apache.druid.msq.input.InputSlice;
@ -30,7 +30,6 @@ import org.apache.druid.msq.input.InputSliceReader;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.input.ReadableInputs;
import org.apache.druid.msq.input.table.SegmentWithDescriptor;
import org.apache.druid.msq.querykit.LazyResourceHolder;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentWrangler;
@ -75,33 +74,31 @@ public class LookupInputSliceReader implements InputSliceReader
() -> Iterators.singletonIterator(
ReadableInput.segment(
new SegmentWithDescriptor(
new LazyResourceHolder<>(
() -> {
final Iterable<Segment> segments =
segmentWrangler.getSegmentsForIntervals(
new LookupDataSource(lookupName),
Intervals.ONLY_ETERNITY
);
() -> {
final Iterable<Segment> segments =
segmentWrangler.getSegmentsForIntervals(
new LookupDataSource(lookupName),
Intervals.ONLY_ETERNITY
);
final Iterator<Segment> segmentIterator = segments.iterator();
if (!segmentIterator.hasNext()) {
throw new ISE("Lookup[%s] is not loaded", lookupName);
}
final Iterator<Segment> segmentIterator = segments.iterator();
if (!segmentIterator.hasNext()) {
throw new ISE("Lookup[%s] is not loaded", lookupName);
}
final Segment segment = segmentIterator.next();
if (segmentIterator.hasNext()) {
// LookupSegmentWrangler always returns zero or one segments, so this code block can't
// happen. That being said: we'll program defensively anyway.
CloseableUtils.closeAndSuppressExceptions(
segment,
e -> log.warn(e, "Failed to close segment for lookup[%s]", lookupName)
);
throw new ISE("Lookup[%s] has multiple segments; cannot read", lookupName);
}
final Segment segment = segmentIterator.next();
if (segmentIterator.hasNext()) {
// LookupSegmentWrangler always returns zero or one segments, so this code block can't
// happen. That being said: we'll program defensively anyway.
CloseableUtils.closeAndSuppressExceptions(
segment,
e -> log.warn(e, "Failed to close segment for lookup[%s]", lookupName)
);
throw new ISE("Lookup[%s] has multiple segments; cannot read", lookupName);
}
return Pair.of(segment, segment);
}
),
return ResourceHolder.fromCloseable(segment);
},
SegmentId.dummy(lookupName).toDescriptor()
)
)

View File

@ -24,36 +24,45 @@ import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.Segment;
import java.io.Closeable;
import java.util.Objects;
import java.util.function.Supplier;
/**
* A holder for a physical segment.
* A holder for a supplier of a physical segment.
*/
public class SegmentWithDescriptor implements Closeable
public class SegmentWithDescriptor
{
private final ResourceHolder<? extends Segment> segmentHolder;
private final Supplier<? extends ResourceHolder<Segment>> segmentSupplier;
private final SegmentDescriptor descriptor;
/**
* Create a new instance.
*
* @param segmentSupplier supplier of a {@link ResourceHolder} of segment. The {@link ResourceHolder#close()} logic
* must include a delegated call to {@link Segment#close()}.
* @param descriptor segment descriptor
*/
public SegmentWithDescriptor(
final ResourceHolder<? extends Segment> segmentHolder,
final Supplier<? extends ResourceHolder<Segment>> segmentSupplier,
final SegmentDescriptor descriptor
)
{
this.segmentHolder = Preconditions.checkNotNull(segmentHolder, "segment");
this.segmentSupplier = Preconditions.checkNotNull(segmentSupplier, "segment");
this.descriptor = Preconditions.checkNotNull(descriptor, "descriptor");
}
/**
* The physical segment.
*
* Named "getOrLoad" because the segment may be held by an eager or lazy resource holder (i.e.
* {@link org.apache.druid.msq.querykit.LazyResourceHolder}). If the resource holder is lazy, the segment is acquired
* Named "getOrLoad" because the segment may be generated by a lazy supplier. In this case, the segment is acquired
* as part of the call to this method.
*
* It is not necessary to call {@link Segment#close()} on the returned segment. Calling {@link ResourceHolder#close()}
* is enough.
*/
public Segment getOrLoadSegment()
public ResourceHolder<Segment> getOrLoad()
{
return segmentHolder.get();
return segmentSupplier.get();
}
/**
@ -64,15 +73,6 @@ public class SegmentWithDescriptor implements Closeable
return descriptor;
}
/**
* Release resources used by the physical segment.
*/
@Override
public void close()
{
segmentHolder.close();
}
@Override
public boolean equals(Object o)
{
@ -83,12 +83,12 @@ public class SegmentWithDescriptor implements Closeable
return false;
}
SegmentWithDescriptor that = (SegmentWithDescriptor) o;
return Objects.equals(segmentHolder, that.segmentHolder) && Objects.equals(descriptor, that.descriptor);
return Objects.equals(segmentSupplier, that.segmentSupplier) && Objects.equals(descriptor, that.descriptor);
}
@Override
public int hashCode()
{
return Objects.hash(segmentHolder, descriptor);
return Objects.hash(segmentSupplier, descriptor);
}
}

View File

@ -20,7 +20,6 @@
package org.apache.druid.msq.input.table;
import com.google.common.collect.Iterators;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.counters.CounterNames;
import org.apache.druid.msq.counters.CounterTracker;
@ -29,7 +28,6 @@ import org.apache.druid.msq.input.InputSliceReader;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.input.ReadableInputs;
import org.apache.druid.msq.querykit.DataSegmentProvider;
import org.apache.druid.segment.Segment;
import org.apache.druid.timeline.SegmentId;
import java.util.Iterator;
@ -92,8 +90,10 @@ public class SegmentsInputSliceReader implements InputSliceReader
descriptor.getPartitionNumber()
);
final ResourceHolder<Segment> segmentHolder = dataSegmentProvider.fetchSegment(segmentId, channelCounters);
return new SegmentWithDescriptor(segmentHolder, descriptor);
return new SegmentWithDescriptor(
dataSegmentProvider.fetchSegment(segmentId, channelCounters),
descriptor
);
}
).iterator();
}

View File

@ -54,7 +54,7 @@ public abstract class BaseLeafFrameProcessor implements FrameProcessor<Long>
private final Query<?> query;
private final ReadableInput baseInput;
private final List<ReadableFrameChannel> inputChannels;
private final ResourceHolder<WritableFrameChannel> outputChannel;
private final ResourceHolder<WritableFrameChannel> outputChannelHolder;
private final ResourceHolder<FrameWriterFactory> frameWriterFactoryHolder;
private final BroadcastJoinHelper broadcastJoinHelper;
@ -64,14 +64,14 @@ public abstract class BaseLeafFrameProcessor implements FrameProcessor<Long>
final Query<?> query,
final ReadableInput baseInput,
final Int2ObjectMap<ReadableInput> sideChannels,
final ResourceHolder<WritableFrameChannel> outputChannel,
final ResourceHolder<WritableFrameChannel> outputChannelHolder,
final ResourceHolder<FrameWriterFactory> frameWriterFactoryHolder,
final long memoryReservedForBroadcastJoin
)
{
this.query = query;
this.baseInput = baseInput;
this.outputChannel = outputChannel;
this.outputChannelHolder = outputChannelHolder;
this.frameWriterFactoryHolder = frameWriterFactoryHolder;
final Pair<List<ReadableFrameChannel>, BroadcastJoinHelper> inputChannelsAndBroadcastJoinHelper =
@ -145,7 +145,7 @@ public abstract class BaseLeafFrameProcessor implements FrameProcessor<Long>
@Override
public List<WritableFrameChannel> outputChannels()
{
return Collections.singletonList(outputChannel.get());
return Collections.singletonList(outputChannelHolder.get());
}
@Override
@ -166,9 +166,7 @@ public abstract class BaseLeafFrameProcessor implements FrameProcessor<Long>
@Override
public void cleanup() throws IOException
{
// Don't close the output channel, because multiple workers write to the same channel.
// The channel should be closed by the caller.
FrameProcessors.closeAll(inputChannels(), Collections.emptyList(), outputChannel, frameWriterFactoryHolder);
FrameProcessors.closeAll(inputChannels(), Collections.emptyList(), outputChannelHolder, frameWriterFactoryHolder);
}
protected FrameWriterFactory getFrameWriterFactory()

View File

@ -31,7 +31,6 @@ import org.apache.druid.frame.processor.OutputChannel;
import org.apache.druid.frame.processor.OutputChannelFactory;
import org.apache.druid.frame.processor.OutputChannels;
import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.logger.Logger;
@ -255,7 +254,7 @@ public abstract class BaseLeafFrameProcessorFactory extends BaseFrameProcessorFa
protected abstract FrameProcessor<Long> makeProcessor(
ReadableInput baseInput,
Int2ObjectMap<ReadableInput> sideChannels,
ResourceHolder<WritableFrameChannel> outputChannelSupplier,
ResourceHolder<WritableFrameChannel> outputChannel,
ResourceHolder<FrameWriterFactory> frameWriterFactory,
FrameContext providerThingy
);
@ -273,21 +272,29 @@ public abstract class BaseLeafFrameProcessorFactory extends BaseFrameProcessorFa
resource = queueRef.get().poll();
}
return Pair.of(
resource,
() -> {
synchronized (queueRef) {
final Queue<T> queue = queueRef.get();
if (queue != null) {
queue.add(resource);
return;
}
}
return new ResourceHolder<T>()
{
@Override
public T get()
{
return resource;
}
// Queue was null
backupCloser.accept(resource);
@Override
public void close()
{
synchronized (queueRef) {
final Queue<T> queue = queueRef.get();
if (queue != null) {
queue.add(resource);
return;
}
}
);
// Queue was null
backupCloser.accept(resource);
}
};
}
);
}

View File

@ -24,13 +24,18 @@ import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.segment.Segment;
import org.apache.druid.timeline.SegmentId;
import java.util.function.Supplier;
public interface DataSegmentProvider
{
/**
* Fetches the segment corresponding to the provided segmentId from deep storage,
* segment fetched.
* Returns a supplier that fetches the segment corresponding to the provided segmentId from deep storage. The segment
* is not actually fetched until you call {@link Supplier#get()}. Once you call this, make sure to also call
* {@link ResourceHolder#close()}.
*
* It is not necessary to call {@link ResourceHolder#close()} if you never call {@link Supplier#get()}.
*/
ResourceHolder<Segment> fetchSegment(
Supplier<ResourceHolder<Segment>> fetchSegment(
SegmentId segmentId,
ChannelCounters channelCounters
);

View File

@ -21,11 +21,9 @@ package org.apache.druid.msq.querykit;
import com.google.common.base.Preconditions;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;
import javax.annotation.concurrent.NotThreadSafe;
import java.io.Closeable;
import java.util.function.Supplier;
@NotThreadSafe
@ -33,11 +31,10 @@ public class LazyResourceHolder<T> implements ResourceHolder<T>
{
private static final Logger log = new Logger(LazyResourceHolder.class);
private final Supplier<Pair<T, Closeable>> supplier;
private T resource = null;
private Closeable closer = null;
private final Supplier<ResourceHolder<T>> supplier;
private ResourceHolder<T> supplied = null;
public LazyResourceHolder(final Supplier<Pair<T, Closeable>> supplier)
public LazyResourceHolder(final Supplier<ResourceHolder<T>> supplier)
{
this.supplier = Preconditions.checkNotNull(supplier, "supplier");
}
@ -45,28 +42,25 @@ public class LazyResourceHolder<T> implements ResourceHolder<T>
@Override
public T get()
{
if (resource == null) {
final Pair<T, Closeable> supplied = supplier.get();
resource = Preconditions.checkNotNull(supplied.lhs, "resource");
closer = Preconditions.checkNotNull(supplied.rhs, "closer");
if (supplied == null) {
supplied = supplier.get();
}
return resource;
return supplied.get();
}
@Override
public void close()
{
if (resource != null) {
if (supplied != null) {
try {
closer.close();
supplied.close();
}
catch (Throwable e) {
log.noStackTrace().warn(e, "Exception encountered while closing resource: %s", resource);
log.noStackTrace().warn(e, "Exception encountered while closing resource: %s", supplied.get());
}
finally {
resource = null;
closer = null;
supplied = null;
}
}
}

View File

@ -48,6 +48,7 @@ import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.timeline.SegmentId;
@ -74,7 +75,7 @@ public class GroupByPreShuffleFrameProcessor extends BaseLeafFrameProcessor
final ReadableInput baseInput,
final Int2ObjectMap<ReadableInput> sideChannels,
final GroupByStrategySelector strategySelector,
final ResourceHolder<WritableFrameChannel> outputChannel,
final ResourceHolder<WritableFrameChannel> outputChannelHolder,
final ResourceHolder<FrameWriterFactory> frameWriterFactoryHolder,
final long memoryReservedForBroadcastJoin
)
@ -83,7 +84,7 @@ public class GroupByPreShuffleFrameProcessor extends BaseLeafFrameProcessor
query,
baseInput,
sideChannels,
outputChannel,
outputChannelHolder,
frameWriterFactoryHolder,
memoryReservedForBroadcastJoin
);
@ -100,13 +101,13 @@ public class GroupByPreShuffleFrameProcessor extends BaseLeafFrameProcessor
protected ReturnOrAwait<Long> runWithSegment(final SegmentWithDescriptor segment) throws IOException
{
if (resultYielder == null) {
closer.register(segment);
final ResourceHolder<Segment> segmentHolder = closer.register(segment.getOrLoad());
final Sequence<ResultRow> rowSequence =
strategySelector.strategize(query)
.process(
query.withQuerySegmentSpec(new SpecificSegmentSpec(segment.getDescriptor())),
mapSegment(segment.getOrLoadSegment()).asStorageAdapter(),
mapSegment(segmentHolder.get()).asStorageAdapter(),
null
);

View File

@ -28,11 +28,9 @@ import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.frame.channel.WritableFrameChannel;
import org.apache.druid.frame.processor.FrameProcessor;
import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.kernel.FrameContext;
import org.apache.druid.msq.querykit.BaseLeafFrameProcessorFactory;
import org.apache.druid.msq.querykit.LazyResourceHolder;
import org.apache.druid.query.groupby.GroupByQuery;
@JsonTypeName("groupByPreShuffle")
@ -67,7 +65,7 @@ public class GroupByPreShuffleFrameProcessorFactory extends BaseLeafFrameProcess
sideChannels,
frameContext.groupByStrategySelector(),
outputChannelHolder,
new LazyResourceHolder<>(() -> Pair.of(frameWriterFactoryHolder.get(), frameWriterFactoryHolder)),
frameWriterFactoryHolder,
frameContext.memoryParameters().getBroadcastJoinMemory()
);
}

View File

@ -54,6 +54,7 @@ import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
@ -88,7 +89,7 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
final ScanQuery query,
final ReadableInput baseInput,
final Int2ObjectMap<ReadableInput> sideChannels,
final ResourceHolder<WritableFrameChannel> outputChannel,
final ResourceHolder<WritableFrameChannel> outputChannelHolder,
final ResourceHolder<FrameWriterFactory> frameWriterFactoryHolder,
@Nullable final AtomicLong runningCountForLimit,
final long memoryReservedForBroadcastJoin,
@ -99,7 +100,7 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
query,
baseInput,
sideChannels,
outputChannel,
outputChannelHolder,
frameWriterFactoryHolder,
memoryReservedForBroadcastJoin
);
@ -149,12 +150,12 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
protected ReturnOrAwait<Long> runWithSegment(final SegmentWithDescriptor segment) throws IOException
{
if (cursor == null) {
closer.register(segment);
final ResourceHolder<Segment> segmentHolder = closer.register(segment.getOrLoad());
final Yielder<Cursor> cursorYielder = Yielders.each(
makeCursors(
query.withQuerySegmentSpec(new SpecificSegmentSpec(segment.getDescriptor())),
mapSegment(segment.getOrLoadSegment()).asStorageAdapter()
mapSegment(segmentHolder.get()).asStorageAdapter()
)
);

View File

@ -28,11 +28,9 @@ import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.frame.channel.WritableFrameChannel;
import org.apache.druid.frame.processor.FrameProcessor;
import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.kernel.FrameContext;
import org.apache.druid.msq.querykit.BaseLeafFrameProcessorFactory;
import org.apache.druid.msq.querykit.LazyResourceHolder;
import org.apache.druid.query.scan.ScanQuery;
import javax.annotation.Nullable;
@ -82,7 +80,7 @@ public class ScanQueryFrameProcessorFactory extends BaseLeafFrameProcessorFactor
baseInput,
sideChannels,
outputChannelHolder,
new LazyResourceHolder<>(() -> Pair.of(frameWriterFactoryHolder.get(), frameWriterFactoryHolder)),
frameWriterFactoryHolder,
runningCountForLimit,
frameContext.memoryParameters().getBroadcastJoinMemory(),
frameContext.jsonMapper()

View File

@ -0,0 +1,342 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.exec;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.collections.bitmap.BitmapFactory;
import org.apache.druid.collections.bitmap.RoaringBitmapFactory;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.rpc.CoordinatorServiceClient;
import org.apache.druid.rpc.ServiceRetryPolicy;
import org.apache.druid.segment.DimensionHandler;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.Metadata;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.data.ListIndexed;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LoadSpec;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.loading.SegmentLocalCacheManager;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
public class TaskDataSegmentProviderTest
{
private static final String DATASOURCE = "foo";
private static final int NUM_SEGMENTS = 10;
private static final int THREADS = 8;
private static final String LOAD_SPEC_FILE_NAME = "data";
private List<DataSegment> segments;
private File cacheDir;
private SegmentLocalCacheManager cacheManager;
private TaskDataSegmentProvider provider;
private ListeningExecutorService exec;
private IndexIO indexIO = EasyMock.mock(IndexIO.class);
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Before
public void setUp() throws Exception
{
EmittingLogger.registerEmitter(new NoopServiceEmitter());
EasyMock.reset(indexIO);
EasyMock.expect(indexIO.loadIndex(EasyMock.anyObject())).andReturn(new TestQueryableIndex()).anyTimes();
EasyMock.replay(indexIO);
final ObjectMapper jsonMapper = TestHelper.JSON_MAPPER;
jsonMapper.registerSubtypes(TestLoadSpec.class);
segments = new ArrayList<>();
for (int i = 0; i < NUM_SEGMENTS; i++) {
// Two segments per interval; helps verify that direction creation + deletion does not include races.
final DateTime startTime = DateTimes.of("2000").plusDays(i / 2);
final int partitionNum = i % 2;
segments.add(
DataSegment.builder()
.dataSource(DATASOURCE)
.interval(
Intervals.utc(
startTime.getMillis(),
startTime.plusDays(1).getMillis()
)
)
.version("0")
.shardSpec(new NumberedShardSpec(partitionNum, 2))
.loadSpec(
jsonMapper.convertValue(
new TestLoadSpec(i),
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
)
)
.size(1)
.build()
);
}
cacheDir = temporaryFolder.newFolder();
cacheManager = new SegmentLocalCacheManager(
new SegmentLoaderConfig().withLocations(
ImmutableList.of(new StorageLocationConfig(cacheDir, 10_000_000_000L, null))
),
jsonMapper
);
provider = new TaskDataSegmentProvider(
new TestCoordinatorServiceClientImpl(),
cacheManager,
indexIO
);
exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(THREADS, getClass().getSimpleName() + "-%s"));
}
@After
public void tearDown() throws Exception
{
if (indexIO != null) {
EasyMock.verify(indexIO);
}
if (exec != null) {
exec.shutdownNow();
exec.awaitTermination(1, TimeUnit.MINUTES);
}
}
@Test
public void testConcurrency()
{
final int iterations = 1000;
final List<ListenableFuture<Boolean>> testFutures = new ArrayList<>();
for (int i = 0; i < iterations; i++) {
final int expectedSegmentNumber = i % NUM_SEGMENTS;
final DataSegment segment = segments.get(expectedSegmentNumber);
final ListenableFuture<Supplier<ResourceHolder<Segment>>> f =
exec.submit(() -> provider.fetchSegment(segment.getId(), new ChannelCounters()));
testFutures.add(
FutureUtils.transform(
f,
holderSupplier -> {
try {
final ResourceHolder<Segment> holder = holderSupplier.get();
Assert.assertEquals(segment.getId(), holder.get().getId());
final String expectedStorageDir = DataSegmentPusher.getDefaultStorageDir(segment, false);
final File expectedFile = new File(
StringUtils.format(
"%s/%s/%s",
cacheDir,
expectedStorageDir,
LOAD_SPEC_FILE_NAME
)
);
Assert.assertTrue(expectedFile.exists());
Assert.assertArrayEquals(
Ints.toByteArray(expectedSegmentNumber),
Files.readAllBytes(expectedFile.toPath())
);
holder.close();
return true;
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
)
);
}
Assert.assertEquals(iterations, testFutures.size());
for (int i = 0; i < iterations; i++) {
ListenableFuture<Boolean> testFuture = testFutures.get(i);
Assert.assertTrue("Test iteration #" + i, FutureUtils.getUnchecked(testFuture, false));
}
// Cache dir should exist, but be empty, since we've closed all holders.
Assert.assertTrue(cacheDir.exists());
Assert.assertArrayEquals(new String[]{}, cacheDir.list());
}
private class TestCoordinatorServiceClientImpl implements CoordinatorServiceClient
{
@Override
public ListenableFuture<DataSegment> fetchUsedSegment(String dataSource, String segmentId)
{
for (final DataSegment segment : segments) {
if (segment.getDataSource().equals(dataSource) && segment.getId().toString().equals(segmentId)) {
return Futures.immediateFuture(segment);
}
}
return Futures.immediateFailedFuture(new ISE("No such segment[%s] for dataSource[%s]", segmentId, dataSource));
}
@Override
public CoordinatorServiceClient withRetryPolicy(ServiceRetryPolicy retryPolicy)
{
return this;
}
}
@JsonTypeName("test")
private static class TestLoadSpec implements LoadSpec
{
private final int uniqueId;
@JsonCreator
public TestLoadSpec(@JsonProperty("uniqueId") int uniqueId)
{
this.uniqueId = uniqueId;
}
@JsonProperty
public int getUniqueId()
{
return uniqueId;
}
@Override
public LoadSpecResult loadSegment(File destDir) throws SegmentLoadingException
{
try {
Files.write(new File(destDir, LOAD_SPEC_FILE_NAME).toPath(), Ints.toByteArray(uniqueId));
Files.write(new File(destDir, "version.bin").toPath(), Ints.toByteArray(1));
return new LoadSpecResult(1);
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Failed to load segment in location [%s]", destDir);
}
}
}
private static class TestQueryableIndex implements QueryableIndex
{
@Override
public Interval getDataInterval()
{
throw new UnsupportedOperationException();
}
@Override
public int getNumRows()
{
return 0;
}
@Override
public Indexed<String> getAvailableDimensions()
{
return new ListIndexed<>();
}
@Override
public BitmapFactory getBitmapFactoryForDimensions()
{
return RoaringBitmapFactory.INSTANCE;
}
@Nullable
@Override
public Metadata getMetadata()
{
return null;
}
@Override
public Map<String, DimensionHandler> getDimensionHandlers()
{
return Collections.emptyMap();
}
@Override
public List<String> getColumnNames()
{
return Collections.emptyList();
}
@Nullable
@Override
public ColumnHolder getColumnHolder(String columnName)
{
return null;
}
@Override
public void close()
{
}
}
}

View File

@ -22,6 +22,7 @@ package org.apache.druid.msq.querykit.scan;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
@ -38,13 +39,11 @@ import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.frame.write.FrameWriters;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.kernel.StageId;
import org.apache.druid.msq.kernel.StagePartition;
import org.apache.druid.msq.querykit.LazyResourceHolder;
import org.apache.druid.msq.test.LimitedFrameWriterFactory;
import org.apache.druid.query.Druids;
import org.apache.druid.query.scan.ScanQuery;
@ -147,7 +146,7 @@ public class ScanQueryFrameProcessorTest extends InitializedNullHandlingTest
}
}
},
new LazyResourceHolder<>(() -> Pair.of(frameWriterFactory, () -> {})),
new ReferenceCountingResourceHolder<>(frameWriterFactory, () -> {}),
null,
0L,
new DefaultObjectMapper()

View File

@ -25,6 +25,8 @@ import com.google.common.collect.ImmutableSet;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.TypeLiteral;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
@ -35,14 +37,12 @@ import org.apache.druid.guice.JoinableFactoryModule;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.msq.guice.MSQExternalDataSourceModule;
import org.apache.druid.msq.guice.MSQIndexingModule;
import org.apache.druid.msq.querykit.DataSegmentProvider;
import org.apache.druid.msq.querykit.LazyResourceHolder;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.ForwardingQueryProcessingPool;
import org.apache.druid.query.QueryProcessingPool;
@ -79,7 +79,6 @@ import org.joda.time.Interval;
import org.junit.rules.TemporaryFolder;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.List;
@ -161,8 +160,7 @@ public class CalciteMSQTestsHelper
));
binder.bind(DataSegmentAnnouncer.class).toInstance(new NoopDataSegmentAnnouncer());
binder.bind(DataSegmentProvider.class)
.toInstance((dataSegment, channelCounters) ->
new LazyResourceHolder<>(getSupplierForSegment(dataSegment)));
.toInstance((dataSegment, channelCounters) -> getSupplierForSegment(dataSegment));
GroupByQueryConfig groupByQueryConfig = new GroupByQueryConfig();
binder.bind(GroupByStrategySelector.class)
@ -178,7 +176,7 @@ public class CalciteMSQTestsHelper
);
}
private static Supplier<Pair<Segment, Closeable>> getSupplierForSegment(SegmentId segmentId)
private static Supplier<ResourceHolder<Segment>> getSupplierForSegment(SegmentId segmentId)
{
final TemporaryFolder temporaryFolder = new TemporaryFolder();
try {
@ -285,13 +283,6 @@ public class CalciteMSQTestsHelper
{
}
};
return new Supplier<Pair<Segment, Closeable>>()
{
@Override
public Pair<Segment, Closeable> get()
{
return new Pair<>(segment, Closer.create());
}
};
return () -> new ReferenceCountingResourceHolder<>(segment, Closer.create());
}
}

View File

@ -36,6 +36,8 @@ import com.google.inject.Module;
import com.google.inject.TypeLiteral;
import com.google.inject.util.Modules;
import com.google.inject.util.Providers;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.LongDimensionSchema;
@ -67,7 +69,6 @@ import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.metadata.input.InputSourceModule;
@ -96,7 +97,6 @@ import org.apache.druid.msq.indexing.report.MSQResultsReport;
import org.apache.druid.msq.indexing.report.MSQTaskReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.msq.querykit.DataSegmentProvider;
import org.apache.druid.msq.querykit.LazyResourceHolder;
import org.apache.druid.msq.sql.MSQTaskQueryMaker;
import org.apache.druid.msq.sql.MSQTaskSqlEngine;
import org.apache.druid.msq.util.MultiStageQueryContext;
@ -179,7 +179,6 @@ import org.mockito.Mockito;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@ -386,8 +385,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
binder.bind(QueryProcessingPool.class)
.toInstance(new ForwardingQueryProcessingPool(Execs.singleThreaded("Test-runner-processing-pool")));
binder.bind(DataSegmentProvider.class)
.toInstance((dataSegment, channelCounters) ->
new LazyResourceHolder<>(getSupplierForSegment(dataSegment)));
.toInstance((dataSegment, channelCounters) -> getSupplierForSegment(dataSegment));
binder.bind(IndexIO.class).toInstance(indexIO);
binder.bind(SpecificSegmentsQuerySegmentWalker.class).toInstance(qf.walker());
@ -540,7 +538,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
}
@Nonnull
private Supplier<Pair<Segment, Closeable>> getSupplierForSegment(SegmentId segmentId)
private Supplier<ResourceHolder<Segment>> getSupplierForSegment(SegmentId segmentId)
{
if (segmentManager.getSegment(segmentId) == null) {
final QueryableIndex index;
@ -640,7 +638,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
};
segmentManager.addSegment(segment);
}
return () -> new Pair<>(segmentManager.getSegment(segmentId), Closer.create());
return () -> ReferenceCountingResourceHolder.fromCloseable(segmentManager.getSegment(segmentId));
}
public SelectTester testSelectQuery()

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.collections;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.utils.CloseableUtils;
import java.io.Closeable;
import java.util.concurrent.atomic.AtomicReference;
public class CloseableResourceHolder<T extends Closeable> implements ResourceHolder<T>
{
private final AtomicReference<T> resource;
/**
* Use {@link ResourceHolder#fromCloseable}.
*/
CloseableResourceHolder(T resource)
{
this.resource = new AtomicReference<>(Preconditions.checkNotNull(resource, "resource"));
}
@Override
public T get()
{
final T retVal = resource.get();
if (retVal == null) {
throw new ISE("Already closed");
}
return retVal;
}
@Override
public void close()
{
final T oldResource = resource.getAndSet(null);
CloseableUtils.closeAndWrapExceptions(oldResource);
}
}

View File

@ -77,13 +77,14 @@ public class ReferenceCountingResourceHolder<T> implements ResourceHolder<T>
}
/**
* Increments the reference count by 1 and returns a {@link Releaser}. The returned {@link Releaser} is used to
* decrement the reference count when the caller no longer needs the resource.
* Increments the reference count by 1 and returns a {@link ResourceHolder} representing the new references.
* The returned {@link ResourceHolder} "close" method decrements the reference count when the caller no longer
* needs the resource.
*
* {@link Releaser}s are not thread-safe. If multiple threads need references to the same holder, they should
* each acquire their own {@link Releaser}.
* Returned {@link ResourceHolder} are not thread-safe. If multiple threads need references to the same resource, they
* should each call this method on the original object.
*/
public Releaser increment()
public ResourceHolder<T> increment()
{
while (true) {
int count = this.refCount.get();
@ -95,11 +96,17 @@ public class ReferenceCountingResourceHolder<T> implements ResourceHolder<T>
}
}
// This Releaser is supposed to be used from a single thread, so no synchronization/atomicity
return new Releaser()
// This ResourceHolder is supposed to be used from a single thread, so no synchronization/atomicity
return new ResourceHolder<T>()
{
boolean released = false;
@Override
public T get()
{
return object;
}
@Override
public void close()
{

View File

@ -22,6 +22,7 @@ package org.apache.druid.collections;
import java.io.Closeable;
/**
*
*/
public interface ResourceHolder<T> extends Closeable
{
@ -29,4 +30,9 @@ public interface ResourceHolder<T> extends Closeable
@Override
void close();
static <T extends Closeable> ResourceHolder<T> fromCloseable(final T resource)
{
return new CloseableResourceHolder<>(resource);
}
}

View File

@ -32,7 +32,6 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.collections.BlockingPool;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.collections.Releaser;
import org.apache.druid.common.guava.GuavaUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
@ -59,6 +58,7 @@ import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBasedKey;
import java.io.Closeable;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.List;
@ -243,9 +243,9 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
try (
// These variables are used to close releasers automatically.
@SuppressWarnings("unused")
Releaser bufferReleaser = mergeBufferHolder.increment();
Closeable bufferReleaser = mergeBufferHolder.increment();
@SuppressWarnings("unused")
Releaser grouperReleaser = grouperHolder.increment()
Closeable grouperReleaser = grouperHolder.increment()
) {
// Return true if OK, false if resources were exhausted.
return input.run(queryPlusForRunners, responseContext)

View File

@ -28,7 +28,6 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import it.unimi.dsi.fastutil.objects.Object2IntArrayMap;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.collections.Releaser;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
@ -48,7 +47,7 @@ import org.apache.druid.segment.ObjectColumnSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -409,7 +408,7 @@ public class ParallelCombiner<KeyType>
);
// This variable is used to close releaser automatically.
@SuppressWarnings("unused")
final Releaser releaser = combineBufferHolder.increment()
final Closeable releaser = combineBufferHolder.increment()
) {
while (mergedIterator.hasNext()) {
final Entry<KeyType> next = mergedIterator.next();

View File

@ -19,13 +19,29 @@
package org.apache.druid.collections;
import java.io.Closeable;
import org.junit.Assert;
import org.junit.Test;
/**
* Releaser is like Closeable, but doesn't throw IOExceptions.
*/
public interface Releaser extends Closeable
import java.io.Closeable;
import java.util.concurrent.atomic.AtomicLong;
public class CloseableResourceHolderTest
{
@Override
void close();
@Test
public void testCloseableResourceHolder()
{
final AtomicLong closeCounter = new AtomicLong();
final Closeable closeable = closeCounter::incrementAndGet;
final ResourceHolder<Closeable> holder = ResourceHolder.fromCloseable(closeable);
Assert.assertSame(closeable, holder.get());
holder.close();
Assert.assertEquals(1, closeCounter.get());
holder.close();
Assert.assertEquals(1, closeCounter.get());
Assert.assertThrows(IllegalStateException.class, holder::get);
}
}

View File

@ -45,7 +45,7 @@ public class ReferenceCountingResourceHolderTest
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 100; i++) {
Thread thread = new Thread(() -> {
try (Releaser r = resourceHolder.increment()) {
try (ResourceHolder<Closeable> r = resourceHolder.increment()) {
try {
Thread.sleep(1);
}

View File

@ -519,13 +519,13 @@ public class SegmentLocalCacheManager implements SegmentCacheManager
catch (Exception e) {
log.error(e, "Unable to remove directory[%s]", cacheFile);
}
}
File parent = cacheFile.getParentFile();
if (parent != null) {
File[] children = parent.listFiles();
if (children == null || children.length == 0) {
cleanupCacheFiles(baseFile, parent);
File parent = cacheFile.getParentFile();
if (parent != null) {
File[] children = parent.listFiles();
if (children == null || children.length == 0) {
cleanupCacheFiles(baseFile, parent);
}
}
}
}