diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java index 7c4dc2bb09a..18d60f6e4da 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java @@ -19,14 +19,15 @@ package org.apache.druid.msq.exec; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import org.apache.druid.collections.ReferenceCountingResourceHolder; +import org.apache.druid.collections.ResourceHolder; import org.apache.druid.common.guava.FutureUtils; -import org.apache.druid.java.util.common.FileUtils; -import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.msq.counters.ChannelCounters; import org.apache.druid.msq.querykit.DataSegmentProvider; -import org.apache.druid.msq.querykit.LazyResourceHolder; import org.apache.druid.msq.rpc.CoordinatorServiceClient; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.QueryableIndex; @@ -38,9 +39,13 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.utils.CloseableUtils; +import javax.annotation.Nullable; +import java.io.Closeable; import java.io.File; import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; /** * Production implementation of {@link DataSegmentProvider} using Coordinator APIs. @@ -50,6 +55,7 @@ public class TaskDataSegmentProvider implements DataSegmentProvider private final CoordinatorServiceClient coordinatorClient; private final SegmentCacheManager segmentCacheManager; private final IndexIO indexIO; + private final ConcurrentHashMap holders; public TaskDataSegmentProvider( CoordinatorServiceClient coordinatorClient, @@ -60,56 +66,162 @@ public class TaskDataSegmentProvider implements DataSegmentProvider this.coordinatorClient = coordinatorClient; this.segmentCacheManager = segmentCacheManager; this.indexIO = indexIO; + this.holders = new ConcurrentHashMap<>(); } @Override - public LazyResourceHolder fetchSegment( + public Supplier> fetchSegment( final SegmentId segmentId, final ChannelCounters channelCounters ) { + // Returns Supplier instead of ResourceHolder, so the Coordinator calls and segment downloads happen + // in processing threads, rather than the main thread. (They happen when fetchSegmentInternal is called.) + return () -> { + ResourceHolder holder = null; + + while (holder == null) { + holder = holders.computeIfAbsent( + segmentId, + k -> new SegmentHolder( + () -> fetchSegmentInternal(segmentId, channelCounters), + () -> holders.remove(segmentId) + ) + ).get(); + } + + return holder; + }; + } + + /** + * Helper used by {@link #fetchSegment(SegmentId, ChannelCounters)}. Does the actual fetching of a segment, once it + * is determined that we definitely need to go out and get one. + */ + private ReferenceCountingResourceHolder fetchSegmentInternal( + final SegmentId segmentId, + final ChannelCounters channelCounters + ) + { + final DataSegment dataSegment; try { - // Use LazyResourceHolder so Coordinator call and segment downloads happen in processing threads, - // rather than the main thread. - return new LazyResourceHolder<>( - () -> { - final DataSegment dataSegment; - try { - dataSegment = FutureUtils.get( - coordinatorClient.fetchUsedSegment( - segmentId.getDataSource(), - segmentId.toString() - ), - true - ); - } - catch (InterruptedException | ExecutionException e) { - throw new RE(e, "Failed to fetch segment details from Coordinator for [%s]", segmentId); - } - - final Closer closer = Closer.create(); - try { - final File segmentDir = segmentCacheManager.getSegmentFiles(dataSegment); - closer.register(() -> FileUtils.deleteDirectory(segmentDir)); - - final QueryableIndex index = indexIO.loadIndex(segmentDir); - final int numRows = index.getNumRows(); - final long size = dataSegment.getSize(); - closer.register(() -> channelCounters.addFile(numRows, size)); - closer.register(index); - return Pair.of(new QueryableIndexSegment(index, dataSegment.getId()), closer); - } - catch (IOException | SegmentLoadingException e) { - throw CloseableUtils.closeInCatch( - new RE(e, "Failed to download segment [%s]", segmentId), - closer - ); - } - } + dataSegment = FutureUtils.get( + coordinatorClient.fetchUsedSegment( + segmentId.getDataSource(), + segmentId.toString() + ), + true ); } - catch (Exception e) { - throw new RuntimeException(e); + catch (InterruptedException | ExecutionException e) { + throw new RE(e, "Failed to fetch segment details from Coordinator for [%s]", segmentId); + } + + final Closer closer = Closer.create(); + try { + if (!segmentCacheManager.reserve(dataSegment)) { + throw new ISE("Could not reserve location for segment [%s]", segmentId); + } + closer.register(() -> segmentCacheManager.cleanup(dataSegment)); + final File segmentDir = segmentCacheManager.getSegmentFiles(dataSegment); + + final QueryableIndex index = closer.register(indexIO.loadIndex(segmentDir)); + final QueryableIndexSegment segment = new QueryableIndexSegment(index, dataSegment.getId()); + final int numRows = index.getNumRows(); + final long size = dataSegment.getSize(); + closer.register(() -> channelCounters.addFile(numRows, size)); + return new ReferenceCountingResourceHolder<>(segment, closer); + } + catch (IOException | SegmentLoadingException e) { + throw CloseableUtils.closeInCatch( + new RE(e, "Failed to download segment [%s]", segmentId), + closer + ); + } + } + + private static class SegmentHolder implements Supplier> + { + private final Supplier> holderSupplier; + private final Closeable cleanupFn; + + @GuardedBy("this") + private ReferenceCountingResourceHolder holder; + + @GuardedBy("this") + private boolean closing; + + @GuardedBy("this") + private boolean closed; + + public SegmentHolder(Supplier> holderSupplier, Closeable cleanupFn) + { + this.holderSupplier = holderSupplier; + this.cleanupFn = cleanupFn; + } + + @Override + @Nullable + public ResourceHolder get() + { + synchronized (this) { + if (closing) { + // Wait until the holder is closed. + while (!closed) { + try { + wait(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + // Then, return null so "fetchSegment" will try again. + return null; + } else if (holder == null) { + final ResourceHolder segmentHolder = holderSupplier.get(); + holder = new ReferenceCountingResourceHolder<>( + segmentHolder.get(), + () -> { + synchronized (this) { + CloseableUtils.closeAll( + () -> { + // synchronized block not strictly needed here, but errorprone needs it since it doesn't + // understand the lambda is immediately called. See https://errorprone.info/bugpattern/GuardedBy + synchronized (this) { + closing = true; + } + }, + segmentHolder, + cleanupFn, // removes this holder from the "holders" map + () -> { + // synchronized block not strictly needed here, but errorprone needs it since it doesn't + // understand the lambda is immediately called. See https://errorprone.info/bugpattern/GuardedBy + synchronized (this) { + closed = true; + SegmentHolder.this.notifyAll(); + } + } + ); + } + } + ); + final ResourceHolder retVal = holder.increment(); + // Store already-closed holder, so it disappears when the last reference is closed. + holder.close(); + return retVal; + } else { + try { + return holder.increment(); + } + catch (IllegalStateException e) { + // Possible race: holder is in the process of closing. (This is the only reason "increment" can throw ISE.) + // Return null so "fetchSegment" will try again. + return null; + } + } + } } } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java index c609f8b4c1f..8e705528476 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java @@ -20,6 +20,7 @@ package org.apache.druid.msq.input.external; import com.google.common.collect.Iterators; +import org.apache.druid.collections.ResourceHolder; import org.apache.druid.data.input.ColumnsFilter; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRow; @@ -32,7 +33,6 @@ import org.apache.druid.data.input.impl.InlineInputSource; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.guava.BaseSequence; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.ParseException; @@ -48,7 +48,6 @@ import org.apache.druid.msq.input.NilInputSource; import org.apache.druid.msq.input.ReadableInput; import org.apache.druid.msq.input.ReadableInputs; import org.apache.druid.msq.input.table.SegmentWithDescriptor; -import org.apache.druid.msq.querykit.LazyResourceHolder; import org.apache.druid.msq.util.DimensionSchemaUtils; import org.apache.druid.segment.RowAdapters; import org.apache.druid.segment.RowBasedSegment; @@ -234,7 +233,7 @@ public class ExternalInputSliceReader implements InputSliceReader ); return new SegmentWithDescriptor( - new LazyResourceHolder<>(() -> Pair.of(segment, () -> {})), + () -> ResourceHolder.fromCloseable(segment), segmentId.toDescriptor() ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/inline/InlineInputSliceReader.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/inline/InlineInputSliceReader.java index 6ae41f7d935..4c3e8be9c58 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/inline/InlineInputSliceReader.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/inline/InlineInputSliceReader.java @@ -20,15 +20,14 @@ package org.apache.druid.msq.input.inline; import com.google.common.collect.Iterables; +import org.apache.druid.collections.ResourceHolder; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.Pair; import org.apache.druid.msq.counters.CounterTracker; import org.apache.druid.msq.input.InputSlice; import org.apache.druid.msq.input.InputSliceReader; import org.apache.druid.msq.input.ReadableInput; import org.apache.druid.msq.input.ReadableInputs; import org.apache.druid.msq.input.table.SegmentWithDescriptor; -import org.apache.druid.msq.querykit.LazyResourceHolder; import org.apache.druid.query.InlineDataSource; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.segment.SegmentWrangler; @@ -72,7 +71,7 @@ public class InlineInputSliceReader implements InputSliceReader segmentWrangler.getSegmentsForIntervals(dataSource, Intervals.ONLY_ETERNITY), segment -> ReadableInput.segment( new SegmentWithDescriptor( - new LazyResourceHolder<>(() -> Pair.of(segment, segment)), + () -> ResourceHolder.fromCloseable(segment), DUMMY_SEGMENT_DESCRIPTOR ) ) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/lookup/LookupInputSliceReader.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/lookup/LookupInputSliceReader.java index 4946f464720..648527ce006 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/lookup/LookupInputSliceReader.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/lookup/LookupInputSliceReader.java @@ -20,9 +20,9 @@ package org.apache.druid.msq.input.lookup; import com.google.common.collect.Iterators; +import org.apache.druid.collections.ResourceHolder; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.msq.counters.CounterTracker; import org.apache.druid.msq.input.InputSlice; @@ -30,7 +30,6 @@ import org.apache.druid.msq.input.InputSliceReader; import org.apache.druid.msq.input.ReadableInput; import org.apache.druid.msq.input.ReadableInputs; import org.apache.druid.msq.input.table.SegmentWithDescriptor; -import org.apache.druid.msq.querykit.LazyResourceHolder; import org.apache.druid.query.LookupDataSource; import org.apache.druid.segment.Segment; import org.apache.druid.segment.SegmentWrangler; @@ -75,33 +74,31 @@ public class LookupInputSliceReader implements InputSliceReader () -> Iterators.singletonIterator( ReadableInput.segment( new SegmentWithDescriptor( - new LazyResourceHolder<>( - () -> { - final Iterable segments = - segmentWrangler.getSegmentsForIntervals( - new LookupDataSource(lookupName), - Intervals.ONLY_ETERNITY - ); + () -> { + final Iterable segments = + segmentWrangler.getSegmentsForIntervals( + new LookupDataSource(lookupName), + Intervals.ONLY_ETERNITY + ); - final Iterator segmentIterator = segments.iterator(); - if (!segmentIterator.hasNext()) { - throw new ISE("Lookup[%s] is not loaded", lookupName); - } + final Iterator segmentIterator = segments.iterator(); + if (!segmentIterator.hasNext()) { + throw new ISE("Lookup[%s] is not loaded", lookupName); + } - final Segment segment = segmentIterator.next(); - if (segmentIterator.hasNext()) { - // LookupSegmentWrangler always returns zero or one segments, so this code block can't - // happen. That being said: we'll program defensively anyway. - CloseableUtils.closeAndSuppressExceptions( - segment, - e -> log.warn(e, "Failed to close segment for lookup[%s]", lookupName) - ); - throw new ISE("Lookup[%s] has multiple segments; cannot read", lookupName); - } + final Segment segment = segmentIterator.next(); + if (segmentIterator.hasNext()) { + // LookupSegmentWrangler always returns zero or one segments, so this code block can't + // happen. That being said: we'll program defensively anyway. + CloseableUtils.closeAndSuppressExceptions( + segment, + e -> log.warn(e, "Failed to close segment for lookup[%s]", lookupName) + ); + throw new ISE("Lookup[%s] has multiple segments; cannot read", lookupName); + } - return Pair.of(segment, segment); - } - ), + return ResourceHolder.fromCloseable(segment); + }, SegmentId.dummy(lookupName).toDescriptor() ) ) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java index 94109bc4a71..020b9f2a5bb 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java @@ -24,36 +24,45 @@ import org.apache.druid.collections.ResourceHolder; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.segment.Segment; -import java.io.Closeable; import java.util.Objects; +import java.util.function.Supplier; /** - * A holder for a physical segment. + * A holder for a supplier of a physical segment. */ -public class SegmentWithDescriptor implements Closeable +public class SegmentWithDescriptor { - private final ResourceHolder segmentHolder; + private final Supplier> segmentSupplier; private final SegmentDescriptor descriptor; + /** + * Create a new instance. + * + * @param segmentSupplier supplier of a {@link ResourceHolder} of segment. The {@link ResourceHolder#close()} logic + * must include a delegated call to {@link Segment#close()}. + * @param descriptor segment descriptor + */ public SegmentWithDescriptor( - final ResourceHolder segmentHolder, + final Supplier> segmentSupplier, final SegmentDescriptor descriptor ) { - this.segmentHolder = Preconditions.checkNotNull(segmentHolder, "segment"); + this.segmentSupplier = Preconditions.checkNotNull(segmentSupplier, "segment"); this.descriptor = Preconditions.checkNotNull(descriptor, "descriptor"); } /** * The physical segment. * - * Named "getOrLoad" because the segment may be held by an eager or lazy resource holder (i.e. - * {@link org.apache.druid.msq.querykit.LazyResourceHolder}). If the resource holder is lazy, the segment is acquired + * Named "getOrLoad" because the segment may be generated by a lazy supplier. In this case, the segment is acquired * as part of the call to this method. + * + * It is not necessary to call {@link Segment#close()} on the returned segment. Calling {@link ResourceHolder#close()} + * is enough. */ - public Segment getOrLoadSegment() + public ResourceHolder getOrLoad() { - return segmentHolder.get(); + return segmentSupplier.get(); } /** @@ -64,15 +73,6 @@ public class SegmentWithDescriptor implements Closeable return descriptor; } - /** - * Release resources used by the physical segment. - */ - @Override - public void close() - { - segmentHolder.close(); - } - @Override public boolean equals(Object o) { @@ -83,12 +83,12 @@ public class SegmentWithDescriptor implements Closeable return false; } SegmentWithDescriptor that = (SegmentWithDescriptor) o; - return Objects.equals(segmentHolder, that.segmentHolder) && Objects.equals(descriptor, that.descriptor); + return Objects.equals(segmentSupplier, that.segmentSupplier) && Objects.equals(descriptor, that.descriptor); } @Override public int hashCode() { - return Objects.hash(segmentHolder, descriptor); + return Objects.hash(segmentSupplier, descriptor); } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java index 29236d7b492..f7efd287aed 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java @@ -20,7 +20,6 @@ package org.apache.druid.msq.input.table; import com.google.common.collect.Iterators; -import org.apache.druid.collections.ResourceHolder; import org.apache.druid.msq.counters.ChannelCounters; import org.apache.druid.msq.counters.CounterNames; import org.apache.druid.msq.counters.CounterTracker; @@ -29,7 +28,6 @@ import org.apache.druid.msq.input.InputSliceReader; import org.apache.druid.msq.input.ReadableInput; import org.apache.druid.msq.input.ReadableInputs; import org.apache.druid.msq.querykit.DataSegmentProvider; -import org.apache.druid.segment.Segment; import org.apache.druid.timeline.SegmentId; import java.util.Iterator; @@ -92,8 +90,10 @@ public class SegmentsInputSliceReader implements InputSliceReader descriptor.getPartitionNumber() ); - final ResourceHolder segmentHolder = dataSegmentProvider.fetchSegment(segmentId, channelCounters); - return new SegmentWithDescriptor(segmentHolder, descriptor); + return new SegmentWithDescriptor( + dataSegmentProvider.fetchSegment(segmentId, channelCounters), + descriptor + ); } ).iterator(); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java index 6c3a02df4f5..d5b31328b0b 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java @@ -54,7 +54,7 @@ public abstract class BaseLeafFrameProcessor implements FrameProcessor private final Query query; private final ReadableInput baseInput; private final List inputChannels; - private final ResourceHolder outputChannel; + private final ResourceHolder outputChannelHolder; private final ResourceHolder frameWriterFactoryHolder; private final BroadcastJoinHelper broadcastJoinHelper; @@ -64,14 +64,14 @@ public abstract class BaseLeafFrameProcessor implements FrameProcessor final Query query, final ReadableInput baseInput, final Int2ObjectMap sideChannels, - final ResourceHolder outputChannel, + final ResourceHolder outputChannelHolder, final ResourceHolder frameWriterFactoryHolder, final long memoryReservedForBroadcastJoin ) { this.query = query; this.baseInput = baseInput; - this.outputChannel = outputChannel; + this.outputChannelHolder = outputChannelHolder; this.frameWriterFactoryHolder = frameWriterFactoryHolder; final Pair, BroadcastJoinHelper> inputChannelsAndBroadcastJoinHelper = @@ -145,7 +145,7 @@ public abstract class BaseLeafFrameProcessor implements FrameProcessor @Override public List outputChannels() { - return Collections.singletonList(outputChannel.get()); + return Collections.singletonList(outputChannelHolder.get()); } @Override @@ -166,9 +166,7 @@ public abstract class BaseLeafFrameProcessor implements FrameProcessor @Override public void cleanup() throws IOException { - // Don't close the output channel, because multiple workers write to the same channel. - // The channel should be closed by the caller. - FrameProcessors.closeAll(inputChannels(), Collections.emptyList(), outputChannel, frameWriterFactoryHolder); + FrameProcessors.closeAll(inputChannels(), Collections.emptyList(), outputChannelHolder, frameWriterFactoryHolder); } protected FrameWriterFactory getFrameWriterFactory() diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java index d57db72264e..7581da03b44 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java @@ -31,7 +31,6 @@ import org.apache.druid.frame.processor.OutputChannel; import org.apache.druid.frame.processor.OutputChannelFactory; import org.apache.druid.frame.processor.OutputChannels; import org.apache.druid.frame.write.FrameWriterFactory; -import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.logger.Logger; @@ -255,7 +254,7 @@ public abstract class BaseLeafFrameProcessorFactory extends BaseFrameProcessorFa protected abstract FrameProcessor makeProcessor( ReadableInput baseInput, Int2ObjectMap sideChannels, - ResourceHolder outputChannelSupplier, + ResourceHolder outputChannel, ResourceHolder frameWriterFactory, FrameContext providerThingy ); @@ -273,21 +272,29 @@ public abstract class BaseLeafFrameProcessorFactory extends BaseFrameProcessorFa resource = queueRef.get().poll(); } - return Pair.of( - resource, - () -> { - synchronized (queueRef) { - final Queue queue = queueRef.get(); - if (queue != null) { - queue.add(resource); - return; - } - } + return new ResourceHolder() + { + @Override + public T get() + { + return resource; + } - // Queue was null - backupCloser.accept(resource); + @Override + public void close() + { + synchronized (queueRef) { + final Queue queue = queueRef.get(); + if (queue != null) { + queue.add(resource); + return; + } } - ); + + // Queue was null + backupCloser.accept(resource); + } + }; } ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java index b5d95a4401b..5f5f6099e5f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java @@ -24,13 +24,18 @@ import org.apache.druid.msq.counters.ChannelCounters; import org.apache.druid.segment.Segment; import org.apache.druid.timeline.SegmentId; +import java.util.function.Supplier; + public interface DataSegmentProvider { /** - * Fetches the segment corresponding to the provided segmentId from deep storage, - * segment fetched. + * Returns a supplier that fetches the segment corresponding to the provided segmentId from deep storage. The segment + * is not actually fetched until you call {@link Supplier#get()}. Once you call this, make sure to also call + * {@link ResourceHolder#close()}. + * + * It is not necessary to call {@link ResourceHolder#close()} if you never call {@link Supplier#get()}. */ - ResourceHolder fetchSegment( + Supplier> fetchSegment( SegmentId segmentId, ChannelCounters channelCounters ); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/LazyResourceHolder.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/LazyResourceHolder.java index 4ae03c5dcf7..e9a897cd16b 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/LazyResourceHolder.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/LazyResourceHolder.java @@ -21,11 +21,9 @@ package org.apache.druid.msq.querykit; import com.google.common.base.Preconditions; import org.apache.druid.collections.ResourceHolder; -import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.logger.Logger; import javax.annotation.concurrent.NotThreadSafe; -import java.io.Closeable; import java.util.function.Supplier; @NotThreadSafe @@ -33,11 +31,10 @@ public class LazyResourceHolder implements ResourceHolder { private static final Logger log = new Logger(LazyResourceHolder.class); - private final Supplier> supplier; - private T resource = null; - private Closeable closer = null; + private final Supplier> supplier; + private ResourceHolder supplied = null; - public LazyResourceHolder(final Supplier> supplier) + public LazyResourceHolder(final Supplier> supplier) { this.supplier = Preconditions.checkNotNull(supplier, "supplier"); } @@ -45,28 +42,25 @@ public class LazyResourceHolder implements ResourceHolder @Override public T get() { - if (resource == null) { - final Pair supplied = supplier.get(); - resource = Preconditions.checkNotNull(supplied.lhs, "resource"); - closer = Preconditions.checkNotNull(supplied.rhs, "closer"); + if (supplied == null) { + supplied = supplier.get(); } - return resource; + return supplied.get(); } @Override public void close() { - if (resource != null) { + if (supplied != null) { try { - closer.close(); + supplied.close(); } catch (Throwable e) { - log.noStackTrace().warn(e, "Exception encountered while closing resource: %s", resource); + log.noStackTrace().warn(e, "Exception encountered while closing resource: %s", supplied.get()); } finally { - resource = null; - closer = null; + supplied = null; } } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java index 1469d5d81b3..dbf89096ab2 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java @@ -48,6 +48,7 @@ import org.apache.druid.query.groupby.strategy.GroupByStrategySelector; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.spec.SpecificSegmentSpec; import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.Segment; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.timeline.SegmentId; @@ -74,7 +75,7 @@ public class GroupByPreShuffleFrameProcessor extends BaseLeafFrameProcessor final ReadableInput baseInput, final Int2ObjectMap sideChannels, final GroupByStrategySelector strategySelector, - final ResourceHolder outputChannel, + final ResourceHolder outputChannelHolder, final ResourceHolder frameWriterFactoryHolder, final long memoryReservedForBroadcastJoin ) @@ -83,7 +84,7 @@ public class GroupByPreShuffleFrameProcessor extends BaseLeafFrameProcessor query, baseInput, sideChannels, - outputChannel, + outputChannelHolder, frameWriterFactoryHolder, memoryReservedForBroadcastJoin ); @@ -100,13 +101,13 @@ public class GroupByPreShuffleFrameProcessor extends BaseLeafFrameProcessor protected ReturnOrAwait runWithSegment(final SegmentWithDescriptor segment) throws IOException { if (resultYielder == null) { - closer.register(segment); + final ResourceHolder segmentHolder = closer.register(segment.getOrLoad()); final Sequence rowSequence = strategySelector.strategize(query) .process( query.withQuerySegmentSpec(new SpecificSegmentSpec(segment.getDescriptor())), - mapSegment(segment.getOrLoadSegment()).asStorageAdapter(), + mapSegment(segmentHolder.get()).asStorageAdapter(), null ); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessorFactory.java index e8783666c09..15e9f24190a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessorFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessorFactory.java @@ -28,11 +28,9 @@ import org.apache.druid.collections.ResourceHolder; import org.apache.druid.frame.channel.WritableFrameChannel; import org.apache.druid.frame.processor.FrameProcessor; import org.apache.druid.frame.write.FrameWriterFactory; -import org.apache.druid.java.util.common.Pair; import org.apache.druid.msq.input.ReadableInput; import org.apache.druid.msq.kernel.FrameContext; import org.apache.druid.msq.querykit.BaseLeafFrameProcessorFactory; -import org.apache.druid.msq.querykit.LazyResourceHolder; import org.apache.druid.query.groupby.GroupByQuery; @JsonTypeName("groupByPreShuffle") @@ -67,7 +65,7 @@ public class GroupByPreShuffleFrameProcessorFactory extends BaseLeafFrameProcess sideChannels, frameContext.groupByStrategySelector(), outputChannelHolder, - new LazyResourceHolder<>(() -> Pair.of(frameWriterFactoryHolder.get(), frameWriterFactoryHolder)), + frameWriterFactoryHolder, frameContext.memoryParameters().getBroadcastJoinMemory() ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java index 886b5bed20d..4bab7e4abd5 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java @@ -54,6 +54,7 @@ import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.spec.SpecificSegmentSpec; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.Segment; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.VirtualColumn; import org.apache.druid.segment.VirtualColumns; @@ -88,7 +89,7 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor final ScanQuery query, final ReadableInput baseInput, final Int2ObjectMap sideChannels, - final ResourceHolder outputChannel, + final ResourceHolder outputChannelHolder, final ResourceHolder frameWriterFactoryHolder, @Nullable final AtomicLong runningCountForLimit, final long memoryReservedForBroadcastJoin, @@ -99,7 +100,7 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor query, baseInput, sideChannels, - outputChannel, + outputChannelHolder, frameWriterFactoryHolder, memoryReservedForBroadcastJoin ); @@ -149,12 +150,12 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor protected ReturnOrAwait runWithSegment(final SegmentWithDescriptor segment) throws IOException { if (cursor == null) { - closer.register(segment); + final ResourceHolder segmentHolder = closer.register(segment.getOrLoad()); final Yielder cursorYielder = Yielders.each( makeCursors( query.withQuerySegmentSpec(new SpecificSegmentSpec(segment.getDescriptor())), - mapSegment(segment.getOrLoadSegment()).asStorageAdapter() + mapSegment(segmentHolder.get()).asStorageAdapter() ) ); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java index 37e88659dc1..391ad5ba90f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java @@ -28,11 +28,9 @@ import org.apache.druid.collections.ResourceHolder; import org.apache.druid.frame.channel.WritableFrameChannel; import org.apache.druid.frame.processor.FrameProcessor; import org.apache.druid.frame.write.FrameWriterFactory; -import org.apache.druid.java.util.common.Pair; import org.apache.druid.msq.input.ReadableInput; import org.apache.druid.msq.kernel.FrameContext; import org.apache.druid.msq.querykit.BaseLeafFrameProcessorFactory; -import org.apache.druid.msq.querykit.LazyResourceHolder; import org.apache.druid.query.scan.ScanQuery; import javax.annotation.Nullable; @@ -82,7 +80,7 @@ public class ScanQueryFrameProcessorFactory extends BaseLeafFrameProcessorFactor baseInput, sideChannels, outputChannelHolder, - new LazyResourceHolder<>(() -> Pair.of(frameWriterFactoryHolder.get(), frameWriterFactoryHolder)), + frameWriterFactoryHolder, runningCountForLimit, frameContext.memoryParameters().getBroadcastJoinMemory(), frameContext.jsonMapper() diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java new file mode 100644 index 00000000000..b1a97b3da00 --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.exec; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.druid.collections.ResourceHolder; +import org.apache.druid.collections.bitmap.BitmapFactory; +import org.apache.druid.collections.bitmap.RoaringBitmapFactory; +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.jackson.JacksonUtils; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.msq.counters.ChannelCounters; +import org.apache.druid.msq.rpc.CoordinatorServiceClient; +import org.apache.druid.rpc.ServiceRetryPolicy; +import org.apache.druid.segment.DimensionHandler; +import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.Metadata; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.Segment; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.data.Indexed; +import org.apache.druid.segment.data.ListIndexed; +import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.segment.loading.LoadSpec; +import org.apache.druid.segment.loading.SegmentLoaderConfig; +import org.apache.druid.segment.loading.SegmentLoadingException; +import org.apache.druid.segment.loading.SegmentLocalCacheManager; +import org.apache.druid.segment.loading.StorageLocationConfig; +import org.apache.druid.server.metrics.NoopServiceEmitter; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.easymock.EasyMock; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +public class TaskDataSegmentProviderTest +{ + private static final String DATASOURCE = "foo"; + private static final int NUM_SEGMENTS = 10; + private static final int THREADS = 8; + private static final String LOAD_SPEC_FILE_NAME = "data"; + + private List segments; + private File cacheDir; + private SegmentLocalCacheManager cacheManager; + private TaskDataSegmentProvider provider; + private ListeningExecutorService exec; + private IndexIO indexIO = EasyMock.mock(IndexIO.class); + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Before + public void setUp() throws Exception + { + EmittingLogger.registerEmitter(new NoopServiceEmitter()); + + EasyMock.reset(indexIO); + EasyMock.expect(indexIO.loadIndex(EasyMock.anyObject())).andReturn(new TestQueryableIndex()).anyTimes(); + EasyMock.replay(indexIO); + + final ObjectMapper jsonMapper = TestHelper.JSON_MAPPER; + jsonMapper.registerSubtypes(TestLoadSpec.class); + + segments = new ArrayList<>(); + + for (int i = 0; i < NUM_SEGMENTS; i++) { + // Two segments per interval; helps verify that direction creation + deletion does not include races. + final DateTime startTime = DateTimes.of("2000").plusDays(i / 2); + final int partitionNum = i % 2; + + segments.add( + DataSegment.builder() + .dataSource(DATASOURCE) + .interval( + Intervals.utc( + startTime.getMillis(), + startTime.plusDays(1).getMillis() + ) + ) + .version("0") + .shardSpec(new NumberedShardSpec(partitionNum, 2)) + .loadSpec( + jsonMapper.convertValue( + new TestLoadSpec(i), + JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT + ) + ) + .size(1) + .build() + ); + } + + cacheDir = temporaryFolder.newFolder(); + cacheManager = new SegmentLocalCacheManager( + new SegmentLoaderConfig().withLocations( + ImmutableList.of(new StorageLocationConfig(cacheDir, 10_000_000_000L, null)) + ), + jsonMapper + ); + + provider = new TaskDataSegmentProvider( + new TestCoordinatorServiceClientImpl(), + cacheManager, + indexIO + ); + + exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(THREADS, getClass().getSimpleName() + "-%s")); + } + + @After + public void tearDown() throws Exception + { + if (indexIO != null) { + EasyMock.verify(indexIO); + } + + if (exec != null) { + exec.shutdownNow(); + exec.awaitTermination(1, TimeUnit.MINUTES); + } + } + + @Test + public void testConcurrency() + { + final int iterations = 1000; + final List> testFutures = new ArrayList<>(); + + for (int i = 0; i < iterations; i++) { + final int expectedSegmentNumber = i % NUM_SEGMENTS; + final DataSegment segment = segments.get(expectedSegmentNumber); + final ListenableFuture>> f = + exec.submit(() -> provider.fetchSegment(segment.getId(), new ChannelCounters())); + + testFutures.add( + FutureUtils.transform( + f, + holderSupplier -> { + try { + final ResourceHolder holder = holderSupplier.get(); + Assert.assertEquals(segment.getId(), holder.get().getId()); + + final String expectedStorageDir = DataSegmentPusher.getDefaultStorageDir(segment, false); + final File expectedFile = new File( + StringUtils.format( + "%s/%s/%s", + cacheDir, + expectedStorageDir, + LOAD_SPEC_FILE_NAME + ) + ); + + Assert.assertTrue(expectedFile.exists()); + Assert.assertArrayEquals( + Ints.toByteArray(expectedSegmentNumber), + Files.readAllBytes(expectedFile.toPath()) + ); + + holder.close(); + + return true; + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + ) + ); + } + + Assert.assertEquals(iterations, testFutures.size()); + for (int i = 0; i < iterations; i++) { + ListenableFuture testFuture = testFutures.get(i); + Assert.assertTrue("Test iteration #" + i, FutureUtils.getUnchecked(testFuture, false)); + } + + // Cache dir should exist, but be empty, since we've closed all holders. + Assert.assertTrue(cacheDir.exists()); + Assert.assertArrayEquals(new String[]{}, cacheDir.list()); + } + + private class TestCoordinatorServiceClientImpl implements CoordinatorServiceClient + { + @Override + public ListenableFuture fetchUsedSegment(String dataSource, String segmentId) + { + for (final DataSegment segment : segments) { + if (segment.getDataSource().equals(dataSource) && segment.getId().toString().equals(segmentId)) { + return Futures.immediateFuture(segment); + } + } + + return Futures.immediateFailedFuture(new ISE("No such segment[%s] for dataSource[%s]", segmentId, dataSource)); + } + + @Override + public CoordinatorServiceClient withRetryPolicy(ServiceRetryPolicy retryPolicy) + { + return this; + } + } + + @JsonTypeName("test") + private static class TestLoadSpec implements LoadSpec + { + private final int uniqueId; + + @JsonCreator + public TestLoadSpec(@JsonProperty("uniqueId") int uniqueId) + { + this.uniqueId = uniqueId; + } + + @JsonProperty + public int getUniqueId() + { + return uniqueId; + } + + @Override + public LoadSpecResult loadSegment(File destDir) throws SegmentLoadingException + { + try { + Files.write(new File(destDir, LOAD_SPEC_FILE_NAME).toPath(), Ints.toByteArray(uniqueId)); + Files.write(new File(destDir, "version.bin").toPath(), Ints.toByteArray(1)); + return new LoadSpecResult(1); + } + catch (IOException e) { + throw new SegmentLoadingException(e, "Failed to load segment in location [%s]", destDir); + } + } + } + + private static class TestQueryableIndex implements QueryableIndex + { + @Override + public Interval getDataInterval() + { + throw new UnsupportedOperationException(); + } + + @Override + public int getNumRows() + { + return 0; + } + + @Override + public Indexed getAvailableDimensions() + { + return new ListIndexed<>(); + } + + @Override + public BitmapFactory getBitmapFactoryForDimensions() + { + return RoaringBitmapFactory.INSTANCE; + } + + @Nullable + @Override + public Metadata getMetadata() + { + return null; + } + + @Override + public Map getDimensionHandlers() + { + return Collections.emptyMap(); + } + + @Override + public List getColumnNames() + { + return Collections.emptyList(); + } + + @Nullable + @Override + public ColumnHolder getColumnHolder(String columnName) + { + return null; + } + + @Override + public void close() + { + + } + } +} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java index 96645f6dab5..283c8c6d1e2 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java @@ -22,6 +22,7 @@ package org.apache.druid.msq.querykit.scan; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import it.unimi.dsi.fastutil.ints.Int2ObjectMaps; +import org.apache.druid.collections.ReferenceCountingResourceHolder; import org.apache.druid.collections.ResourceHolder; import org.apache.druid.frame.Frame; import org.apache.druid.frame.FrameType; @@ -38,13 +39,11 @@ import org.apache.druid.frame.write.FrameWriterFactory; import org.apache.druid.frame.write.FrameWriters; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.msq.input.ReadableInput; import org.apache.druid.msq.kernel.StageId; import org.apache.druid.msq.kernel.StagePartition; -import org.apache.druid.msq.querykit.LazyResourceHolder; import org.apache.druid.msq.test.LimitedFrameWriterFactory; import org.apache.druid.query.Druids; import org.apache.druid.query.scan.ScanQuery; @@ -147,7 +146,7 @@ public class ScanQueryFrameProcessorTest extends InitializedNullHandlingTest } } }, - new LazyResourceHolder<>(() -> Pair.of(frameWriterFactory, () -> {})), + new ReferenceCountingResourceHolder<>(frameWriterFactory, () -> {}), null, 0L, new DefaultObjectMapper() diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java index 8bf2fa089ff..ee2f3b65b1b 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java @@ -25,6 +25,8 @@ import com.google.common.collect.ImmutableSet; import com.google.inject.Injector; import com.google.inject.Module; import com.google.inject.TypeLiteral; +import org.apache.druid.collections.ReferenceCountingResourceHolder; +import org.apache.druid.collections.ResourceHolder; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.LongDimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema; @@ -35,14 +37,12 @@ import org.apache.druid.guice.JoinableFactoryModule; import org.apache.druid.guice.annotations.Self; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.msq.guice.MSQExternalDataSourceModule; import org.apache.druid.msq.guice.MSQIndexingModule; import org.apache.druid.msq.querykit.DataSegmentProvider; -import org.apache.druid.msq.querykit.LazyResourceHolder; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.ForwardingQueryProcessingPool; import org.apache.druid.query.QueryProcessingPool; @@ -79,7 +79,6 @@ import org.joda.time.Interval; import org.junit.rules.TemporaryFolder; import javax.annotation.Nullable; -import java.io.Closeable; import java.io.File; import java.io.IOException; import java.util.List; @@ -161,8 +160,7 @@ public class CalciteMSQTestsHelper )); binder.bind(DataSegmentAnnouncer.class).toInstance(new NoopDataSegmentAnnouncer()); binder.bind(DataSegmentProvider.class) - .toInstance((dataSegment, channelCounters) -> - new LazyResourceHolder<>(getSupplierForSegment(dataSegment))); + .toInstance((dataSegment, channelCounters) -> getSupplierForSegment(dataSegment)); GroupByQueryConfig groupByQueryConfig = new GroupByQueryConfig(); binder.bind(GroupByStrategySelector.class) @@ -178,7 +176,7 @@ public class CalciteMSQTestsHelper ); } - private static Supplier> getSupplierForSegment(SegmentId segmentId) + private static Supplier> getSupplierForSegment(SegmentId segmentId) { final TemporaryFolder temporaryFolder = new TemporaryFolder(); try { @@ -285,13 +283,6 @@ public class CalciteMSQTestsHelper { } }; - return new Supplier>() - { - @Override - public Pair get() - { - return new Pair<>(segment, Closer.create()); - } - }; + return () -> new ReferenceCountingResourceHolder<>(segment, Closer.create()); } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index b037eecbb8c..493be17284f 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -36,6 +36,8 @@ import com.google.inject.Module; import com.google.inject.TypeLiteral; import com.google.inject.util.Modules; import com.google.inject.util.Providers; +import org.apache.druid.collections.ReferenceCountingResourceHolder; +import org.apache.druid.collections.ResourceHolder; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.LongDimensionSchema; @@ -67,7 +69,6 @@ import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.guava.Yielder; -import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.metadata.input.InputSourceModule; @@ -96,7 +97,6 @@ import org.apache.druid.msq.indexing.report.MSQResultsReport; import org.apache.druid.msq.indexing.report.MSQTaskReport; import org.apache.druid.msq.indexing.report.MSQTaskReportPayload; import org.apache.druid.msq.querykit.DataSegmentProvider; -import org.apache.druid.msq.querykit.LazyResourceHolder; import org.apache.druid.msq.sql.MSQTaskQueryMaker; import org.apache.druid.msq.sql.MSQTaskSqlEngine; import org.apache.druid.msq.util.MultiStageQueryContext; @@ -179,7 +179,6 @@ import org.mockito.Mockito; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import java.io.Closeable; import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -386,8 +385,7 @@ public class MSQTestBase extends BaseCalciteQueryTest binder.bind(QueryProcessingPool.class) .toInstance(new ForwardingQueryProcessingPool(Execs.singleThreaded("Test-runner-processing-pool"))); binder.bind(DataSegmentProvider.class) - .toInstance((dataSegment, channelCounters) -> - new LazyResourceHolder<>(getSupplierForSegment(dataSegment))); + .toInstance((dataSegment, channelCounters) -> getSupplierForSegment(dataSegment)); binder.bind(IndexIO.class).toInstance(indexIO); binder.bind(SpecificSegmentsQuerySegmentWalker.class).toInstance(qf.walker()); @@ -540,7 +538,7 @@ public class MSQTestBase extends BaseCalciteQueryTest } @Nonnull - private Supplier> getSupplierForSegment(SegmentId segmentId) + private Supplier> getSupplierForSegment(SegmentId segmentId) { if (segmentManager.getSegment(segmentId) == null) { final QueryableIndex index; @@ -640,7 +638,7 @@ public class MSQTestBase extends BaseCalciteQueryTest }; segmentManager.addSegment(segment); } - return () -> new Pair<>(segmentManager.getSegment(segmentId), Closer.create()); + return () -> ReferenceCountingResourceHolder.fromCloseable(segmentManager.getSegment(segmentId)); } public SelectTester testSelectQuery() diff --git a/processing/src/main/java/org/apache/druid/collections/CloseableResourceHolder.java b/processing/src/main/java/org/apache/druid/collections/CloseableResourceHolder.java new file mode 100644 index 00000000000..e2c8234d384 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/collections/CloseableResourceHolder.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.collections; + +import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.utils.CloseableUtils; + +import java.io.Closeable; +import java.util.concurrent.atomic.AtomicReference; + +public class CloseableResourceHolder implements ResourceHolder +{ + private final AtomicReference resource; + + /** + * Use {@link ResourceHolder#fromCloseable}. + */ + CloseableResourceHolder(T resource) + { + this.resource = new AtomicReference<>(Preconditions.checkNotNull(resource, "resource")); + } + + @Override + public T get() + { + final T retVal = resource.get(); + if (retVal == null) { + throw new ISE("Already closed"); + } + return retVal; + } + + @Override + public void close() + { + final T oldResource = resource.getAndSet(null); + CloseableUtils.closeAndWrapExceptions(oldResource); + } +} diff --git a/processing/src/main/java/org/apache/druid/collections/ReferenceCountingResourceHolder.java b/processing/src/main/java/org/apache/druid/collections/ReferenceCountingResourceHolder.java index 808100dfb23..6d731923c55 100644 --- a/processing/src/main/java/org/apache/druid/collections/ReferenceCountingResourceHolder.java +++ b/processing/src/main/java/org/apache/druid/collections/ReferenceCountingResourceHolder.java @@ -77,13 +77,14 @@ public class ReferenceCountingResourceHolder implements ResourceHolder } /** - * Increments the reference count by 1 and returns a {@link Releaser}. The returned {@link Releaser} is used to - * decrement the reference count when the caller no longer needs the resource. + * Increments the reference count by 1 and returns a {@link ResourceHolder} representing the new references. + * The returned {@link ResourceHolder} "close" method decrements the reference count when the caller no longer + * needs the resource. * - * {@link Releaser}s are not thread-safe. If multiple threads need references to the same holder, they should - * each acquire their own {@link Releaser}. + * Returned {@link ResourceHolder} are not thread-safe. If multiple threads need references to the same resource, they + * should each call this method on the original object. */ - public Releaser increment() + public ResourceHolder increment() { while (true) { int count = this.refCount.get(); @@ -95,11 +96,17 @@ public class ReferenceCountingResourceHolder implements ResourceHolder } } - // This Releaser is supposed to be used from a single thread, so no synchronization/atomicity - return new Releaser() + // This ResourceHolder is supposed to be used from a single thread, so no synchronization/atomicity + return new ResourceHolder() { boolean released = false; + @Override + public T get() + { + return object; + } + @Override public void close() { diff --git a/processing/src/main/java/org/apache/druid/collections/ResourceHolder.java b/processing/src/main/java/org/apache/druid/collections/ResourceHolder.java index 5f35bc9c44e..0eab667aeb4 100644 --- a/processing/src/main/java/org/apache/druid/collections/ResourceHolder.java +++ b/processing/src/main/java/org/apache/druid/collections/ResourceHolder.java @@ -22,6 +22,7 @@ package org.apache.druid.collections; import java.io.Closeable; /** + * */ public interface ResourceHolder extends Closeable { @@ -29,4 +30,9 @@ public interface ResourceHolder extends Closeable @Override void close(); + + static ResourceHolder fromCloseable(final T resource) + { + return new CloseableResourceHolder<>(resource); + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java index 6718dff9f83..1ad44bc84eb 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java @@ -32,7 +32,6 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.collections.BlockingPool; import org.apache.druid.collections.ReferenceCountingResourceHolder; -import org.apache.druid.collections.Releaser; import org.apache.druid.common.guava.GuavaUtils; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; @@ -59,6 +58,7 @@ import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBasedKey; +import java.io.Closeable; import java.io.File; import java.nio.ByteBuffer; import java.util.List; @@ -243,9 +243,9 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner try ( // These variables are used to close releasers automatically. @SuppressWarnings("unused") - Releaser bufferReleaser = mergeBufferHolder.increment(); + Closeable bufferReleaser = mergeBufferHolder.increment(); @SuppressWarnings("unused") - Releaser grouperReleaser = grouperHolder.increment() + Closeable grouperReleaser = grouperHolder.increment() ) { // Return true if OK, false if resources were exhausted. return input.run(queryPlusForRunners, responseContext) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ParallelCombiner.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ParallelCombiner.java index 5d244b579a8..d00f6f47852 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ParallelCombiner.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ParallelCombiner.java @@ -28,7 +28,6 @@ import com.google.common.util.concurrent.ListeningExecutorService; import it.unimi.dsi.fastutil.objects.Object2IntArrayMap; import it.unimi.dsi.fastutil.objects.Object2IntMap; import org.apache.druid.collections.ReferenceCountingResourceHolder; -import org.apache.druid.collections.Releaser; import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; @@ -48,7 +47,7 @@ import org.apache.druid.segment.ObjectColumnSelector; import org.apache.druid.segment.column.ColumnCapabilities; import javax.annotation.Nullable; - +import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -409,7 +408,7 @@ public class ParallelCombiner ); // This variable is used to close releaser automatically. @SuppressWarnings("unused") - final Releaser releaser = combineBufferHolder.increment() + final Closeable releaser = combineBufferHolder.increment() ) { while (mergedIterator.hasNext()) { final Entry next = mergedIterator.next(); diff --git a/processing/src/main/java/org/apache/druid/collections/Releaser.java b/processing/src/test/java/org/apache/druid/collections/CloseableResourceHolderTest.java similarity index 57% rename from processing/src/main/java/org/apache/druid/collections/Releaser.java rename to processing/src/test/java/org/apache/druid/collections/CloseableResourceHolderTest.java index f9f2d77ec3c..783e24889c2 100644 --- a/processing/src/main/java/org/apache/druid/collections/Releaser.java +++ b/processing/src/test/java/org/apache/druid/collections/CloseableResourceHolderTest.java @@ -19,13 +19,29 @@ package org.apache.druid.collections; -import java.io.Closeable; +import org.junit.Assert; +import org.junit.Test; -/** - * Releaser is like Closeable, but doesn't throw IOExceptions. - */ -public interface Releaser extends Closeable +import java.io.Closeable; +import java.util.concurrent.atomic.AtomicLong; + +public class CloseableResourceHolderTest { - @Override - void close(); + @Test + public void testCloseableResourceHolder() + { + final AtomicLong closeCounter = new AtomicLong(); + final Closeable closeable = closeCounter::incrementAndGet; + final ResourceHolder holder = ResourceHolder.fromCloseable(closeable); + + Assert.assertSame(closeable, holder.get()); + + holder.close(); + Assert.assertEquals(1, closeCounter.get()); + + holder.close(); + Assert.assertEquals(1, closeCounter.get()); + + Assert.assertThrows(IllegalStateException.class, holder::get); + } } diff --git a/processing/src/test/java/org/apache/druid/collections/ReferenceCountingResourceHolderTest.java b/processing/src/test/java/org/apache/druid/collections/ReferenceCountingResourceHolderTest.java index 29df2c3abad..9c9983450e5 100644 --- a/processing/src/test/java/org/apache/druid/collections/ReferenceCountingResourceHolderTest.java +++ b/processing/src/test/java/org/apache/druid/collections/ReferenceCountingResourceHolderTest.java @@ -45,7 +45,7 @@ public class ReferenceCountingResourceHolderTest List threads = new ArrayList<>(); for (int i = 0; i < 100; i++) { Thread thread = new Thread(() -> { - try (Releaser r = resourceHolder.increment()) { + try (ResourceHolder r = resourceHolder.increment()) { try { Thread.sleep(1); } diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java index c6c216594e4..5f7e71501ce 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java @@ -519,13 +519,13 @@ public class SegmentLocalCacheManager implements SegmentCacheManager catch (Exception e) { log.error(e, "Unable to remove directory[%s]", cacheFile); } - } - File parent = cacheFile.getParentFile(); - if (parent != null) { - File[] children = parent.listFiles(); - if (children == null || children.length == 0) { - cleanupCacheFiles(baseFile, parent); + File parent = cacheFile.getParentFile(); + if (parent != null) { + File[] children = parent.listFiles(); + if (children == null || children.length == 0) { + cleanupCacheFiles(baseFile, parent); + } } } }