diff --git a/processing/src/main/java/org/apache/druid/frame/processor/FrameChannelMerger.java b/processing/src/main/java/org/apache/druid/frame/processor/FrameChannelMerger.java index e806f98b926..662fd001b02 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/FrameChannelMerger.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/FrameChannelMerger.java @@ -19,8 +19,10 @@ package org.apache.druid.frame.processor; +import it.unimi.dsi.fastutil.ints.IntAVLTreeSet; import it.unimi.dsi.fastutil.ints.IntOpenHashSet; import it.unimi.dsi.fastutil.ints.IntSet; +import it.unimi.dsi.fastutil.ints.IntSets; import org.apache.druid.frame.Frame; import org.apache.druid.frame.channel.FrameWithPartition; import org.apache.druid.frame.channel.ReadableFrameChannel; @@ -68,7 +70,11 @@ public class FrameChannelMerger implements FrameProcessor private final long rowLimit; private long rowsOutput = 0; private int currentPartition = 0; - private int remainingChannels; + + /** + * Channels that still have input to read. + */ + private final IntSet remainingChannels; // ColumnSelectorFactory that always reads from the current row in the merged sequence. final MultiColumnSelectorFactory mergedColumnSelectorFactory; @@ -119,7 +125,7 @@ public class FrameChannelMerger implements FrameProcessor this.partitions = partitionsToUse; this.rowLimit = rowLimit; this.currentFrames = new FramePlus[inputChannels.size()]; - this.remainingChannels = 0; + this.remainingChannels = new IntAVLTreeSet(IntSets.fromTo(0, inputChannels.size())); this.tournamentTree = new TournamentTree( inputChannels.size(), (k1, k2) -> { @@ -241,7 +247,7 @@ public class FrameChannelMerger implements FrameProcessor if (rowLimit != UNLIMITED && rowsOutput >= rowLimit) { // Limit reached; we're done. Arrays.fill(currentFrames, null); - remainingChannels = 0; + remainingChannels.clear(); } else { // Continue reading the currentChannel. final FramePlus channelFramePlus = currentFrames[currentChannel]; @@ -251,7 +257,6 @@ public class FrameChannelMerger implements FrameProcessor // Done reading current frame from "channel". // Clear it and see if there is another one available for immediate loading. currentFrames[currentChannel] = null; - remainingChannels--; final ReadableFrameChannel channel = inputChannels.get(currentChannel); @@ -265,10 +270,10 @@ public class FrameChannelMerger implements FrameProcessor break; } else { currentFrames[currentChannel] = framePlus; - remainingChannels++; } } else if (channel.isFinished()) { // Done reading this channel. Fall through and continue with other channels. + remainingChannels.remove(currentChannel); } else { // Nothing available, not finished; we can't continue. Finish up the current frame and return it. break; @@ -282,9 +287,12 @@ public class FrameChannelMerger implements FrameProcessor } } + /** + * Returns whether all input is done being read. + */ private boolean finished() { - return remainingChannels == 0; + return remainingChannels.isEmpty(); } @Override @@ -302,7 +310,7 @@ public class FrameChannelMerger implements FrameProcessor final IntSet await = new IntOpenHashSet(); for (int i = 0; i < inputChannels.size(); i++) { - if (currentFrames[i] == null) { + if (currentFrames[i] == null && remainingChannels.contains(i)) { final ReadableFrameChannel channel = inputChannels.get(i); if (channel.canRead()) { @@ -312,9 +320,10 @@ public class FrameChannelMerger implements FrameProcessor await.add(i); } else { currentFrames[i] = framePlus; - remainingChannels++; } - } else if (!channel.isFinished()) { + } else if (channel.isFinished()) { + remainingChannels.remove(i); + } else { await.add(i); } } diff --git a/processing/src/test/java/org/apache/druid/frame/processor/SuperSorterTest.java b/processing/src/test/java/org/apache/druid/frame/processor/SuperSorterTest.java index 7a885af49c5..80e7f6352d0 100644 --- a/processing/src/test/java/org/apache/druid/frame/processor/SuperSorterTest.java +++ b/processing/src/test/java/org/apache/druid/frame/processor/SuperSorterTest.java @@ -38,6 +38,8 @@ import org.apache.druid.frame.key.KeyOrder; import org.apache.druid.frame.key.KeyTestUtils; import org.apache.druid.frame.key.RowKey; import org.apache.druid.frame.key.RowKeyReader; +import org.apache.druid.frame.processor.test.AlwaysAsyncPartitionedReadableFrameChannel; +import org.apache.druid.frame.processor.test.AlwaysAsyncReadableFrameChannel; import org.apache.druid.frame.read.FrameReader; import org.apache.druid.frame.testutil.FrameSequenceBuilder; import org.apache.druid.frame.testutil.FrameTestUtil; @@ -434,8 +436,8 @@ public class SuperSorterTest clusterByPartitionsFuture, exec, FrameProcessorDecorator.NONE, - new FileOutputChannelFactory(tempFolder, maxBytesPerFrame, null), - outputChannelFactory, + makeOutputChannelFactory(new FileOutputChannelFactory(tempFolder, maxBytesPerFrame, null)), + makeOutputChannelFactory(outputChannelFactory), maxActiveProcessors, maxChannelsPerProcessor, limitHint, @@ -839,9 +841,48 @@ public class SuperSorterTest for (final BlockingQueueFrameChannel channel : channels) { channel.writable().close(); - retVal.add(channel.readable()); + retVal.add(new AlwaysAsyncReadableFrameChannel(channel.readable())); } return retVal; } + + /** + * Wraps an underlying {@link OutputChannelFactory} in one that uses {@link AlwaysAsyncReadableFrameChannel} + * for all of its readable channels. This helps catch bugs due to improper usage of {@link ReadableFrameChannel} + * methods that enable async reads. + */ + private static OutputChannelFactory makeOutputChannelFactory(final OutputChannelFactory baseFactory) + { + return new OutputChannelFactory() { + @Override + public OutputChannel openChannel(int partitionNumber) throws IOException + { + final OutputChannel channel = baseFactory.openChannel(partitionNumber); + return OutputChannel.pair( + channel.getWritableChannel(), + channel.getFrameMemoryAllocator(), + () -> new AlwaysAsyncReadableFrameChannel(channel.getReadableChannelSupplier().get()), + channel.getPartitionNumber() + ); + } + + @Override + public PartitionedOutputChannel openPartitionedChannel(String name, boolean deleteAfterRead) throws IOException + { + final PartitionedOutputChannel channel = baseFactory.openPartitionedChannel(name, deleteAfterRead); + return PartitionedOutputChannel.pair( + channel.getWritableChannel(), + channel.getFrameMemoryAllocator(), + () -> new AlwaysAsyncPartitionedReadableFrameChannel(channel.getReadableChannelSupplier().get()) + ); + } + + @Override + public OutputChannel openNilChannel(int partitionNumber) + { + return baseFactory.openNilChannel(partitionNumber); + } + }; + } } diff --git a/processing/src/test/java/org/apache/druid/frame/processor/test/AlwaysAsyncPartitionedReadableFrameChannel.java b/processing/src/test/java/org/apache/druid/frame/processor/test/AlwaysAsyncPartitionedReadableFrameChannel.java new file mode 100644 index 00000000000..4013889df6e --- /dev/null +++ b/processing/src/test/java/org/apache/druid/frame/processor/test/AlwaysAsyncPartitionedReadableFrameChannel.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor.test; + +import org.apache.druid.frame.channel.PartitionedReadableFrameChannel; +import org.apache.druid.frame.channel.ReadableFrameChannel; + +import java.io.IOException; + +/** + * Implementation of {@link PartitionedReadableFrameChannel} that wraps all underlying channels in + * {@link AlwaysAsyncReadableFrameChannel}. + */ +public class AlwaysAsyncPartitionedReadableFrameChannel implements PartitionedReadableFrameChannel +{ + private final PartitionedReadableFrameChannel delegate; + + public AlwaysAsyncPartitionedReadableFrameChannel(PartitionedReadableFrameChannel delegate) + { + this.delegate = delegate; + } + + @Override + public ReadableFrameChannel getReadableFrameChannel(int partitionNumber) + { + return new AlwaysAsyncReadableFrameChannel(delegate.getReadableFrameChannel(partitionNumber)); + } + + @Override + public void close() throws IOException + { + delegate.close(); + } +} diff --git a/processing/src/test/java/org/apache/druid/frame/processor/test/AlwaysAsyncReadableFrameChannel.java b/processing/src/test/java/org/apache/druid/frame/processor/test/AlwaysAsyncReadableFrameChannel.java new file mode 100644 index 00000000000..8ff10aeb7b0 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/frame/processor/test/AlwaysAsyncReadableFrameChannel.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor.test; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.channel.ReadableFrameChannel; +import org.apache.druid.java.util.common.ISE; + +/** + * Wraps an underlying channel and forces an async style of reading. After each call to {@link #read()}, the + * {@link #canRead()} and {@link #isFinished()} methods return false until {@link #readabilityFuture()} is called. + */ +public class AlwaysAsyncReadableFrameChannel implements ReadableFrameChannel +{ + private final ReadableFrameChannel delegate; + private boolean defer; + + public AlwaysAsyncReadableFrameChannel(ReadableFrameChannel delegate) + { + this.delegate = delegate; + } + + @Override + public boolean isFinished() + { + if (defer) { + return false; + } + + return delegate.isFinished(); + } + + @Override + public boolean canRead() + { + if (defer) { + return false; + } + + return delegate.canRead(); + } + + @Override + public Frame read() + { + if (defer) { + throw new ISE("Cannot call read() while deferred"); + } + + defer = true; + return delegate.read(); + } + + @Override + public ListenableFuture readabilityFuture() + { + defer = false; + return delegate.readabilityFuture(); + } + + @Override + public void close() + { + defer = false; + delegate.close(); + } +}