diff --git a/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java b/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java index 7f2a61e437e..c576c236961 100644 --- a/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java +++ b/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java @@ -22,6 +22,9 @@ package org.apache.druid.frame.channel; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.frame.processor.OutputChannel; +import org.apache.druid.frame.processor.PartitionedOutputChannel; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.query.ResourceLimitExceededException; @@ -38,16 +41,29 @@ import java.util.function.Supplier; */ public class ComposingWritableFrameChannel implements WritableFrameChannel { - private final List> channels; + @Nullable + private final List> outputChannelSuppliers; + + @Nullable + private final List> partitionedOutputChannelSuppliers; + + private final List> writableChannelSuppliers; private final Map> partitionToChannelMap; private int currentIndex; public ComposingWritableFrameChannel( - List> channels, + @Nullable List> outputChannelSuppliers, + @Nullable List> partitionedOutputChannelSuppliers, + List> writableChannelSuppliers, Map> partitionToChannelMap ) { - this.channels = Preconditions.checkNotNull(channels, "channels is null"); + if (outputChannelSuppliers != null && partitionedOutputChannelSuppliers != null) { + throw new IAE("Atmost one of outputChannelSuppliers and partitionedOutputChannelSuppliers can be provided"); + } + this.outputChannelSuppliers = outputChannelSuppliers; + this.partitionedOutputChannelSuppliers = partitionedOutputChannelSuppliers; + this.writableChannelSuppliers = Preconditions.checkNotNull(writableChannelSuppliers, "writableChannelSuppliers is null"); this.partitionToChannelMap = Preconditions.checkNotNull(partitionToChannelMap, "partitionToChannelMap is null"); this.currentIndex = 0; @@ -56,12 +72,12 @@ public class ComposingWritableFrameChannel implements WritableFrameChannel @Override public void write(FrameWithPartition frameWithPartition) throws IOException { - if (currentIndex >= channels.size()) { - throw new ISE("No more channels available to write. Total available channels : " + channels.size()); + if (currentIndex >= writableChannelSuppliers.size()) { + throw new ISE("No more channels available to write. Total available channels : " + writableChannelSuppliers.size()); } try { - channels.get(currentIndex).get().write(frameWithPartition); + writableChannelSuppliers.get(currentIndex).get().write(frameWithPartition); partitionToChannelMap.computeIfAbsent(frameWithPartition.partition(), k -> Sets.newHashSetWithExpectedSize(1)) .add(currentIndex); } @@ -70,9 +86,19 @@ public class ComposingWritableFrameChannel implements WritableFrameChannel // exception is automatically passed up to the user incase all the channels are exhausted. If in future, more // cases come up to dictate control flow, then we can switch to returning a custom object from the channel's write // operation. - channels.get(currentIndex).get().close(); + writableChannelSuppliers.get(currentIndex).get().close(); + + // We are converting the corresponding channel to read only after exhausting it because that channel won't be used + // for writes anymore + if (outputChannelSuppliers != null) { + outputChannelSuppliers.get(currentIndex).get().convertToReadOnly(); + } + if (partitionedOutputChannelSuppliers != null) { + partitionedOutputChannelSuppliers.get(currentIndex).get().convertToReadOnly(); + } + currentIndex++; - if (currentIndex >= channels.size()) { + if (currentIndex >= writableChannelSuppliers.size()) { throw rlee; } write(frameWithPartition); @@ -82,7 +108,7 @@ public class ComposingWritableFrameChannel implements WritableFrameChannel @Override public void fail(@Nullable Throwable cause) throws IOException { - for (Supplier channel : channels) { + for (Supplier channel : writableChannelSuppliers) { channel.get().fail(cause); } } @@ -90,21 +116,21 @@ public class ComposingWritableFrameChannel implements WritableFrameChannel @Override public void close() throws IOException { - if (currentIndex < channels.size()) { - channels.get(currentIndex).get().close(); - currentIndex = channels.size(); + if (currentIndex < writableChannelSuppliers.size()) { + writableChannelSuppliers.get(currentIndex).get().close(); + currentIndex = writableChannelSuppliers.size(); } } @Override public boolean isClosed() { - return currentIndex == channels.size(); + return currentIndex == writableChannelSuppliers.size(); } @Override public ListenableFuture writabilityFuture() { - return channels.get(currentIndex).get().writabilityFuture(); + return writableChannelSuppliers.get(currentIndex).get().writabilityFuture(); } } diff --git a/processing/src/main/java/org/apache/druid/frame/processor/ComposingOutputChannelFactory.java b/processing/src/main/java/org/apache/druid/frame/processor/ComposingOutputChannelFactory.java index 8ca9aa4f6fa..cf94262ac35 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/ComposingOutputChannelFactory.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/ComposingOutputChannelFactory.java @@ -60,6 +60,7 @@ public class ComposingOutputChannelFactory implements OutputChannelFactory { ImmutableList.Builder> writableFrameChannelSuppliersBuilder = ImmutableList.builder(); ImmutableList.Builder> readableFrameChannelSuppliersBuilder = ImmutableList.builder(); + ImmutableList.Builder> outputChannelSupplierBuilder = ImmutableList.builder(); for (OutputChannelFactory channelFactory : channelFactories) { // open channel lazily Supplier channel = @@ -71,14 +72,19 @@ public class ComposingOutputChannelFactory implements OutputChannelFactory throw new UncheckedIOException(e); } })::get; + outputChannelSupplierBuilder.add(channel); writableFrameChannelSuppliersBuilder.add(() -> channel.get().getWritableChannel()); - readableFrameChannelSuppliersBuilder.add(() -> channel.get().getReadableChannelSupplier().get()); + // We read the output channel once they have been written to, and therefore it is space efficient and safe to + // save their read only copies + readableFrameChannelSuppliersBuilder.add(() -> channel.get().readOnly().getReadableChannelSupplier().get()); } // the map maintains a mapping of channels which have the data for a given partition. // it is useful to identify the readable channels to open in the composition while reading the partition data. Map> partitionToChannelMap = new HashMap<>(); ComposingWritableFrameChannel writableFrameChannel = new ComposingWritableFrameChannel( + outputChannelSupplierBuilder.build(), + null, writableFrameChannelSuppliersBuilder.build(), partitionToChannelMap ); @@ -103,6 +109,7 @@ public class ComposingOutputChannelFactory implements OutputChannelFactory ImmutableList.Builder> writableFrameChannelsBuilder = ImmutableList.builder(); ImmutableList.Builder> readableFrameChannelSuppliersBuilder = ImmutableList.builder(); + ImmutableList.Builder> partitionedOutputChannelSupplierBuilder = ImmutableList.builder(); for (OutputChannelFactory channelFactory : channelFactories) { Supplier channel = Suppliers.memoize(() -> { @@ -113,14 +120,19 @@ public class ComposingOutputChannelFactory implements OutputChannelFactory throw new UncheckedIOException(e); } })::get; + partitionedOutputChannelSupplierBuilder.add(channel); writableFrameChannelsBuilder.add(() -> channel.get().getWritableChannel()); - readableFrameChannelSuppliersBuilder.add(() -> channel.get().getReadableChannelSupplier().get()); + // We read the output channel once they have been written to, and therefore it is space efficient and safe to + // save their read only copies + readableFrameChannelSuppliersBuilder.add(() -> channel.get().readOnly().getReadableChannelSupplier().get()); } // the map maintains a mapping of channels which have the data for a given partition. // it is useful to identify the readable channels to open in the composition while reading the partition data. Map> partitionToChannelMap = new HashMap<>(); ComposingWritableFrameChannel writableFrameChannel = new ComposingWritableFrameChannel( + null, + partitionedOutputChannelSupplierBuilder.build(), writableFrameChannelsBuilder.build(), partitionToChannelMap ); diff --git a/processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java b/processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java index ac0e0a5fac5..e1377eddca3 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java @@ -21,6 +21,7 @@ package org.apache.druid.frame.processor; import com.google.common.base.Preconditions; import com.google.common.base.Suppliers; +import com.google.errorprone.annotations.concurrent.GuardedBy; import org.apache.druid.frame.allocation.MemoryAllocator; import org.apache.druid.frame.channel.FrameWithPartition; import org.apache.druid.frame.channel.ReadableFrameChannel; @@ -42,11 +43,16 @@ import java.util.function.Supplier; */ public class OutputChannel { + @GuardedBy("this") @Nullable - private final WritableFrameChannel writableChannel; + private WritableFrameChannel writableChannel; + + @GuardedBy("this") @Nullable - private final MemoryAllocator frameMemoryAllocator; + private MemoryAllocator frameMemoryAllocator; + private final Supplier readableChannelSupplier; + private final boolean readableChannelUsableWhileWriting; private final int partitionNumber; @@ -157,12 +163,14 @@ public class OutputChannel } /** - * Returns the writable channel of this pair. The producer writes to this channel. + * Returns the writable channel of this pair. The producer writes to this channel. Throws ISE if the output channel is + * read only. */ - public WritableFrameChannel getWritableChannel() + public synchronized WritableFrameChannel getWritableChannel() { if (writableChannel == null) { - throw new ISE("Writable channel is not available"); + throw new ISE("Writable channel is not available. The output channel might be marked as read-only," + + " hence no writes are allowed."); } else { return writableChannel; } @@ -170,11 +178,13 @@ public class OutputChannel /** * Returns the memory allocator for the writable channel. The producer uses this to generate frames for the channel. + * Throws ISE if the output channel is read only. */ - public MemoryAllocator getFrameMemoryAllocator() + public synchronized MemoryAllocator getFrameMemoryAllocator() { if (frameMemoryAllocator == null) { - throw new ISE("Writable channel is not available"); + throw new ISE("Frame allocator is not available. The output channel might be marked as read-only," + + " hence memory allocator is not required."); } else { return frameMemoryAllocator; } @@ -197,7 +207,7 @@ public class OutputChannel /** * Whether {@link #getReadableChannel()} is ready to use. */ - public boolean isReadableChannelReady() + public synchronized boolean isReadableChannelReady() { return readableChannelUsableWhileWriting || writableChannel == null || writableChannel.isClosed(); } @@ -212,7 +222,7 @@ public class OutputChannel return partitionNumber; } - public OutputChannel mapWritableChannel(final Function mapFn) + public synchronized OutputChannel mapWritableChannel(final Function mapFn) { if (writableChannel == null) { return this; @@ -235,4 +245,14 @@ public class OutputChannel { return OutputChannel.readOnly(readableChannelSupplier, partitionNumber); } + + /** + * Removes the reference to the {@link #writableChannel} and {@link #frameMemoryAllocator} from the object, making + * it more efficient + */ + public synchronized void convertToReadOnly() + { + this.writableChannel = null; + this.frameMemoryAllocator = null; + } } diff --git a/processing/src/main/java/org/apache/druid/frame/processor/PartitionedOutputChannel.java b/processing/src/main/java/org/apache/druid/frame/processor/PartitionedOutputChannel.java index 3e455545b04..34ad2c23234 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/PartitionedOutputChannel.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/PartitionedOutputChannel.java @@ -21,6 +21,7 @@ package org.apache.druid.frame.processor; import com.google.common.base.Preconditions; import com.google.common.base.Suppliers; +import com.google.errorprone.annotations.concurrent.GuardedBy; import org.apache.druid.frame.allocation.MemoryAllocator; import org.apache.druid.frame.channel.PartitionedReadableFrameChannel; import org.apache.druid.frame.channel.WritableFrameChannel; @@ -37,10 +38,15 @@ import java.util.function.Supplier; */ public class PartitionedOutputChannel { + + @GuardedBy("this") @Nullable - private final WritableFrameChannel writableChannel; + private WritableFrameChannel writableChannel; + + @GuardedBy("this") @Nullable - private final MemoryAllocator frameMemoryAllocator; + private MemoryAllocator frameMemoryAllocator; + private final Supplier readableChannelSupplier; private PartitionedOutputChannel( @@ -76,12 +82,14 @@ public class PartitionedOutputChannel } /** - * Returns the writable channel of this pair. The producer writes to this channel. + * Returns the writable channel of this pair. The producer writes to this channel. Throws ISE if the output channel is + * read only. */ - public WritableFrameChannel getWritableChannel() + public synchronized WritableFrameChannel getWritableChannel() { if (writableChannel == null) { - throw new ISE("Writable channel is not available"); + throw new ISE("Writable channel is not available. The output channel might be marked as read-only," + + " hence no writes are allowed."); } else { return writableChannel; } @@ -89,11 +97,13 @@ public class PartitionedOutputChannel /** * Returns the memory allocator for the writable channel. The producer uses this to generate frames for the channel. + * Throws ISE if the output channel is read only. */ - public MemoryAllocator getFrameMemoryAllocator() + public synchronized MemoryAllocator getFrameMemoryAllocator() { if (frameMemoryAllocator == null) { - throw new ISE("Writable channel is not available"); + throw new ISE("Frame allocator is not available. The output channel might be marked as read-only," + + " hence memory allocator is not required."); } else { return frameMemoryAllocator; } @@ -102,12 +112,12 @@ public class PartitionedOutputChannel /** * Returns the partitioned readable channel supplier of this pair. The consumer reads from this channel. */ - public Supplier getReadableChannelSupplier() + public synchronized Supplier getReadableChannelSupplier() { return readableChannelSupplier; } - public PartitionedOutputChannel mapWritableChannel(final Function mapFn) + public synchronized PartitionedOutputChannel mapWritableChannel(final Function mapFn) { if (writableChannel == null) { return this; @@ -119,4 +129,24 @@ public class PartitionedOutputChannel ); } } + + + /** + * Returns a read-only version of this instance. Read-only versions have neither {@link #getWritableChannel()} nor + * {@link #getFrameMemoryAllocator()}, and therefore require substantially less memory. + */ + public PartitionedOutputChannel readOnly() + { + return new PartitionedOutputChannel(null, null, readableChannelSupplier); + } + + /** + * Removes the reference to the {@link #writableChannel} and {@link #frameMemoryAllocator} from the object, making + * it more efficient + */ + public synchronized void convertToReadOnly() + { + this.writableChannel = null; + this.frameMemoryAllocator = null; + } } diff --git a/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java b/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java index 1be686be6ff..440da49d7c2 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java @@ -611,7 +611,11 @@ public class SuperSorter ); writableChannel = partitionedOutputChannel.getWritableChannel(); frameAllocatorFactory = new SingleMemoryAllocatorFactory(partitionedOutputChannel.getFrameMemoryAllocator()); - levelAndRankToReadableChannelMap.put(levelAndRankKey, partitionedOutputChannel); + + // We add the readOnly() channel even though we require the writableChannel and the frame allocator because + // the original partitionedOutputChannel would contain the reference to those, which would get cleaned up + // appropriately and not be held up in the class level map + levelAndRankToReadableChannelMap.put(levelAndRankKey, partitionedOutputChannel.readOnly()); } final FrameChannelMerger worker = diff --git a/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java b/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java new file mode 100644 index 00000000000..c2968b35795 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.channel; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.allocation.ArenaMemoryAllocator; +import org.apache.druid.frame.processor.OutputChannel; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.query.ResourceLimitExceededException; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.function.Supplier; + + +public class ComposingWritableFrameChannelTest +{ + @Test + public void testComposingWritableChannelSwitchesProperly() throws IOException + { + + // This frame channel writes a single frame + WritableFrameChannel writableFrameChannel1 = new LimitedWritableFrameChannel(2); + WritableFrameChannel writableFrameChannel2 = new LimitedWritableFrameChannel(100); + + Supplier readableFrameChannelSupplier1 = () -> null; + Supplier readableFrameChannelSupplier2 = () -> null; + + OutputChannel outputChannel1 = OutputChannel.pair( + writableFrameChannel1, + ArenaMemoryAllocator.createOnHeap(1), + readableFrameChannelSupplier1, + 1 + ); + OutputChannel outputChannel2 = OutputChannel.pair( + writableFrameChannel2, + ArenaMemoryAllocator.createOnHeap(1), + readableFrameChannelSupplier2, + 2 + ); + + Map> partitionToChannelMap = new HashMap<>(); + + ComposingWritableFrameChannel composingWritableFrameChannel = new ComposingWritableFrameChannel( + ImmutableList.of( + () -> outputChannel1, + () -> outputChannel2 + ), + null, + ImmutableList.of( + () -> writableFrameChannel1, + () -> writableFrameChannel2 + ), + partitionToChannelMap + ); + + composingWritableFrameChannel.write(new FrameWithPartition(Mockito.mock(Frame.class), 1)); + composingWritableFrameChannel.write(new FrameWithPartition(Mockito.mock(Frame.class), 2)); + composingWritableFrameChannel.write(new FrameWithPartition(Mockito.mock(Frame.class), 3)); + + // Assert the location of the channels where the frames have been written to + Assert.assertEquals(ImmutableSet.of(0), partitionToChannelMap.get(1)); + Assert.assertEquals(ImmutableSet.of(0), partitionToChannelMap.get(2)); + Assert.assertEquals(ImmutableSet.of(1), partitionToChannelMap.get(3)); + + // Test if the older channel has been converted to read only + Assert.assertThrows(ISE.class, outputChannel1::getWritableChannel); + } + + static class LimitedWritableFrameChannel implements WritableFrameChannel + { + private final int maxFrames; + private int curFrame = 0; + + public LimitedWritableFrameChannel(int maxFrames) + { + this.maxFrames = maxFrames; + } + + @Override + public void write(FrameWithPartition frameWithPartition) + { + if (curFrame >= maxFrames) { + throw new ResourceLimitExceededException("Cannot write more frames to the channel"); + } + ++curFrame; + } + + @Override + public void write(Frame frame) + { + } + + @Override + public void fail(@Nullable Throwable cause) + { + + } + + @Override + public void close() + { + + } + + @Override + public boolean isClosed() + { + return false; + } + + @Override + public ListenableFuture writabilityFuture() + { + return null; + } + } +} diff --git a/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelTest.java b/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelTest.java index ac0b03b286d..6e5b904ab3f 100644 --- a/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelTest.java +++ b/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelTest.java @@ -44,14 +44,16 @@ public class OutputChannelTest final IllegalStateException e1 = Assert.assertThrows(IllegalStateException.class, channel::getWritableChannel); MatcherAssert.assertThat( e1, - ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Writable channel is not available")) + ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo( + "Writable channel is not available. The output channel might be marked as read-only, hence no writes are allowed.")) ); // No writable channel: cannot call getFrameMemoryAllocator. final IllegalStateException e2 = Assert.assertThrows(IllegalStateException.class, channel::getFrameMemoryAllocator); MatcherAssert.assertThat( e2, - ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Writable channel is not available")) + ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo( + "Frame allocator is not available. The output channel might be marked as read-only, hence memory allocator is not required.")) ); // Mapping the writable channel of a nil channel has no effect, because there is no writable channel. diff --git a/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelsTest.java b/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelsTest.java index dbef92b047a..a22c7e92bfc 100644 --- a/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelsTest.java +++ b/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelsTest.java @@ -86,7 +86,8 @@ public class OutputChannelsTest MatcherAssert.assertThat( e, - ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Writable channel is not available")) + ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo( + "Writable channel is not available. The output channel might be marked as read-only, hence no writes are allowed.")) ); final IllegalStateException e2 = Assert.assertThrows( @@ -96,7 +97,8 @@ public class OutputChannelsTest MatcherAssert.assertThat( e2, - ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Writable channel is not available")) + ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo( + "Frame allocator is not available. The output channel might be marked as read-only, hence memory allocator is not required.")) ); } }