FrameChannelMerger: Fix incorrect behavior of finished(). (#17088)

Previously, the processor used "remainingChannels" to track the number of
non-null entries of currentFrame. Now, "remainingChannels" tracks the
number of channels that are unfinished.

The difference is subtle. In the previous code, when an input channel
was blocked upon exiting nextFrame(), the "currentFrames" entry would be
null, and therefore the "remainingChannels" variable would be decremented.
After the next await and call to populateCurrentFramesAndTournamentTree(),
"remainingChannels" would be incremented if the channel had become
unblocked after awaiting.

This means that finished(), which returned true if remainingChannels was
zero, would not be reliable if called between nextFrame() and the
next await + populateCurrentFramesAndTournamentTree().

This patch changes things such that finished() is always reliable. This
fixes a regression introduced in PR #16911, which added a call to
finished() that was, at that time, unsafe.
This commit is contained in:
Gian Merlino 2024-09-17 08:35:54 -07:00 committed by GitHub
parent 50503fe0ef
commit 46cbb33428
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 198 additions and 12 deletions

View File

@ -19,8 +19,10 @@
package org.apache.druid.frame.processor; package org.apache.druid.frame.processor;
import it.unimi.dsi.fastutil.ints.IntAVLTreeSet;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet; import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet; import it.unimi.dsi.fastutil.ints.IntSet;
import it.unimi.dsi.fastutil.ints.IntSets;
import org.apache.druid.frame.Frame; import org.apache.druid.frame.Frame;
import org.apache.druid.frame.channel.FrameWithPartition; import org.apache.druid.frame.channel.FrameWithPartition;
import org.apache.druid.frame.channel.ReadableFrameChannel; import org.apache.druid.frame.channel.ReadableFrameChannel;
@ -68,7 +70,11 @@ public class FrameChannelMerger implements FrameProcessor<Long>
private final long rowLimit; private final long rowLimit;
private long rowsOutput = 0; private long rowsOutput = 0;
private int currentPartition = 0; private int currentPartition = 0;
private int remainingChannels;
/**
* Channels that still have input to read.
*/
private final IntSet remainingChannels;
// ColumnSelectorFactory that always reads from the current row in the merged sequence. // ColumnSelectorFactory that always reads from the current row in the merged sequence.
final MultiColumnSelectorFactory mergedColumnSelectorFactory; final MultiColumnSelectorFactory mergedColumnSelectorFactory;
@ -119,7 +125,7 @@ public class FrameChannelMerger implements FrameProcessor<Long>
this.partitions = partitionsToUse; this.partitions = partitionsToUse;
this.rowLimit = rowLimit; this.rowLimit = rowLimit;
this.currentFrames = new FramePlus[inputChannels.size()]; this.currentFrames = new FramePlus[inputChannels.size()];
this.remainingChannels = 0; this.remainingChannels = new IntAVLTreeSet(IntSets.fromTo(0, inputChannels.size()));
this.tournamentTree = new TournamentTree( this.tournamentTree = new TournamentTree(
inputChannels.size(), inputChannels.size(),
(k1, k2) -> { (k1, k2) -> {
@ -241,7 +247,7 @@ public class FrameChannelMerger implements FrameProcessor<Long>
if (rowLimit != UNLIMITED && rowsOutput >= rowLimit) { if (rowLimit != UNLIMITED && rowsOutput >= rowLimit) {
// Limit reached; we're done. // Limit reached; we're done.
Arrays.fill(currentFrames, null); Arrays.fill(currentFrames, null);
remainingChannels = 0; remainingChannels.clear();
} else { } else {
// Continue reading the currentChannel. // Continue reading the currentChannel.
final FramePlus channelFramePlus = currentFrames[currentChannel]; final FramePlus channelFramePlus = currentFrames[currentChannel];
@ -251,7 +257,6 @@ public class FrameChannelMerger implements FrameProcessor<Long>
// Done reading current frame from "channel". // Done reading current frame from "channel".
// Clear it and see if there is another one available for immediate loading. // Clear it and see if there is another one available for immediate loading.
currentFrames[currentChannel] = null; currentFrames[currentChannel] = null;
remainingChannels--;
final ReadableFrameChannel channel = inputChannels.get(currentChannel); final ReadableFrameChannel channel = inputChannels.get(currentChannel);
@ -265,10 +270,10 @@ public class FrameChannelMerger implements FrameProcessor<Long>
break; break;
} else { } else {
currentFrames[currentChannel] = framePlus; currentFrames[currentChannel] = framePlus;
remainingChannels++;
} }
} else if (channel.isFinished()) { } else if (channel.isFinished()) {
// Done reading this channel. Fall through and continue with other channels. // Done reading this channel. Fall through and continue with other channels.
remainingChannels.remove(currentChannel);
} else { } else {
// Nothing available, not finished; we can't continue. Finish up the current frame and return it. // Nothing available, not finished; we can't continue. Finish up the current frame and return it.
break; break;
@ -282,9 +287,12 @@ public class FrameChannelMerger implements FrameProcessor<Long>
} }
} }
/**
* Returns whether all input is done being read.
*/
private boolean finished() private boolean finished()
{ {
return remainingChannels == 0; return remainingChannels.isEmpty();
} }
@Override @Override
@ -302,7 +310,7 @@ public class FrameChannelMerger implements FrameProcessor<Long>
final IntSet await = new IntOpenHashSet(); final IntSet await = new IntOpenHashSet();
for (int i = 0; i < inputChannels.size(); i++) { for (int i = 0; i < inputChannels.size(); i++) {
if (currentFrames[i] == null) { if (currentFrames[i] == null && remainingChannels.contains(i)) {
final ReadableFrameChannel channel = inputChannels.get(i); final ReadableFrameChannel channel = inputChannels.get(i);
if (channel.canRead()) { if (channel.canRead()) {
@ -312,9 +320,10 @@ public class FrameChannelMerger implements FrameProcessor<Long>
await.add(i); await.add(i);
} else { } else {
currentFrames[i] = framePlus; currentFrames[i] = framePlus;
remainingChannels++;
} }
} else if (!channel.isFinished()) { } else if (channel.isFinished()) {
remainingChannels.remove(i);
} else {
await.add(i); await.add(i);
} }
} }

View File

@ -38,6 +38,8 @@ import org.apache.druid.frame.key.KeyOrder;
import org.apache.druid.frame.key.KeyTestUtils; import org.apache.druid.frame.key.KeyTestUtils;
import org.apache.druid.frame.key.RowKey; import org.apache.druid.frame.key.RowKey;
import org.apache.druid.frame.key.RowKeyReader; import org.apache.druid.frame.key.RowKeyReader;
import org.apache.druid.frame.processor.test.AlwaysAsyncPartitionedReadableFrameChannel;
import org.apache.druid.frame.processor.test.AlwaysAsyncReadableFrameChannel;
import org.apache.druid.frame.read.FrameReader; import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.testutil.FrameSequenceBuilder; import org.apache.druid.frame.testutil.FrameSequenceBuilder;
import org.apache.druid.frame.testutil.FrameTestUtil; import org.apache.druid.frame.testutil.FrameTestUtil;
@ -434,8 +436,8 @@ public class SuperSorterTest
clusterByPartitionsFuture, clusterByPartitionsFuture,
exec, exec,
FrameProcessorDecorator.NONE, FrameProcessorDecorator.NONE,
new FileOutputChannelFactory(tempFolder, maxBytesPerFrame, null), makeOutputChannelFactory(new FileOutputChannelFactory(tempFolder, maxBytesPerFrame, null)),
outputChannelFactory, makeOutputChannelFactory(outputChannelFactory),
maxActiveProcessors, maxActiveProcessors,
maxChannelsPerProcessor, maxChannelsPerProcessor,
limitHint, limitHint,
@ -839,9 +841,48 @@ public class SuperSorterTest
for (final BlockingQueueFrameChannel channel : channels) { for (final BlockingQueueFrameChannel channel : channels) {
channel.writable().close(); channel.writable().close();
retVal.add(channel.readable()); retVal.add(new AlwaysAsyncReadableFrameChannel(channel.readable()));
} }
return retVal; return retVal;
} }
/**
* Wraps an underlying {@link OutputChannelFactory} in one that uses {@link AlwaysAsyncReadableFrameChannel}
* for all of its readable channels. This helps catch bugs due to improper usage of {@link ReadableFrameChannel}
* methods that enable async reads.
*/
private static OutputChannelFactory makeOutputChannelFactory(final OutputChannelFactory baseFactory)
{
return new OutputChannelFactory() {
@Override
public OutputChannel openChannel(int partitionNumber) throws IOException
{
final OutputChannel channel = baseFactory.openChannel(partitionNumber);
return OutputChannel.pair(
channel.getWritableChannel(),
channel.getFrameMemoryAllocator(),
() -> new AlwaysAsyncReadableFrameChannel(channel.getReadableChannelSupplier().get()),
channel.getPartitionNumber()
);
}
@Override
public PartitionedOutputChannel openPartitionedChannel(String name, boolean deleteAfterRead) throws IOException
{
final PartitionedOutputChannel channel = baseFactory.openPartitionedChannel(name, deleteAfterRead);
return PartitionedOutputChannel.pair(
channel.getWritableChannel(),
channel.getFrameMemoryAllocator(),
() -> new AlwaysAsyncPartitionedReadableFrameChannel(channel.getReadableChannelSupplier().get())
);
}
@Override
public OutputChannel openNilChannel(int partitionNumber)
{
return baseFactory.openNilChannel(partitionNumber);
}
};
}
} }

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.frame.processor.test;
import org.apache.druid.frame.channel.PartitionedReadableFrameChannel;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import java.io.IOException;
/**
* Implementation of {@link PartitionedReadableFrameChannel} that wraps all underlying channels in
* {@link AlwaysAsyncReadableFrameChannel}.
*/
public class AlwaysAsyncPartitionedReadableFrameChannel implements PartitionedReadableFrameChannel
{
private final PartitionedReadableFrameChannel delegate;
public AlwaysAsyncPartitionedReadableFrameChannel(PartitionedReadableFrameChannel delegate)
{
this.delegate = delegate;
}
@Override
public ReadableFrameChannel getReadableFrameChannel(int partitionNumber)
{
return new AlwaysAsyncReadableFrameChannel(delegate.getReadableFrameChannel(partitionNumber));
}
@Override
public void close() throws IOException
{
delegate.close();
}
}

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.frame.processor.test;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.java.util.common.ISE;
/**
* Wraps an underlying channel and forces an async style of reading. After each call to {@link #read()}, the
* {@link #canRead()} and {@link #isFinished()} methods return false until {@link #readabilityFuture()} is called.
*/
public class AlwaysAsyncReadableFrameChannel implements ReadableFrameChannel
{
private final ReadableFrameChannel delegate;
private boolean defer;
public AlwaysAsyncReadableFrameChannel(ReadableFrameChannel delegate)
{
this.delegate = delegate;
}
@Override
public boolean isFinished()
{
if (defer) {
return false;
}
return delegate.isFinished();
}
@Override
public boolean canRead()
{
if (defer) {
return false;
}
return delegate.canRead();
}
@Override
public Frame read()
{
if (defer) {
throw new ISE("Cannot call read() while deferred");
}
defer = true;
return delegate.read();
}
@Override
public ListenableFuture<?> readabilityFuture()
{
defer = false;
return delegate.readabilityFuture();
}
@Override
public void close()
{
defer = false;
delegate.close();
}
}