[Backport] Additional tests for ChannelStageOutputReader. (#17050) (#17187)

The existing tests are moved into a "WithMaximalBuffering" subclass,
and a new "WithMinimalBuffering" subclass is added to test cases
where only a single frame is buffered.

Co-authored-by: Gian Merlino <gianmerlino@gmail.com>
This commit is contained in:
Kashif Faraz 2024-09-30 13:21:55 +05:30 committed by GitHub
parent feca6000d7
commit 97cc748be0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 403 additions and 181 deletions

View File

@ -19,8 +19,10 @@
package org.apache.druid.msq.shuffle.output;
import com.google.common.collect.Iterables;
import com.google.common.io.ByteStreams;
import com.google.common.math.IntMath;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
@ -30,10 +32,12 @@ import org.apache.druid.frame.file.FrameFile;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.testutil.FrameSequenceBuilder;
import org.apache.druid.frame.testutil.FrameTestUtil;
import org.apache.druid.segment.QueryableIndexCursorFactory;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexCursorFactory;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.utils.CloseableUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
@ -50,206 +54,424 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.math.RoundingMode;
import java.nio.file.Files;
import java.util.List;
public class ChannelStageOutputReaderTest extends InitializedNullHandlingTest
{
private static final int MAX_FRAMES = 10;
private static final int EXPECTED_NUM_ROWS = 1209;
private final BlockingQueueFrameChannel channel = new BlockingQueueFrameChannel(MAX_FRAMES);
private final ChannelStageOutputReader reader = new ChannelStageOutputReader(channel.readable());
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private FrameReader frameReader;
private List<Frame> frameList;
@Before
public void setUp()
/**
* Tests that use {@link BlockingQueueFrameChannel#minimal()}.
*/
public static class WithMinimalBuffering extends InitializedNullHandlingTest
{
final IncrementalIndex index = TestIndex.getIncrementalTestIndex();
final IncrementalIndexCursorFactory cursorFactory = new IncrementalIndexCursorFactory(index);
frameReader = FrameReader.create(cursorFactory.getRowSignature());
frameList = FrameSequenceBuilder.fromCursorFactory(cursorFactory)
.frameType(FrameType.ROW_BASED)
.maxRowsPerFrame(IntMath.divide(index.size(), MAX_FRAMES, RoundingMode.CEILING))
.frames()
.toList();
}
@After
public void tearDown()
{
reader.close();
}
@Test
public void test_readLocally() throws IOException
{
writeAllFramesToChannel();
Assert.assertSame(channel.readable(), reader.readLocally());
reader.close(); // Won't close the channel, because it's already been returned by readLocally
final int numRows = FrameTestUtil.readRowsFromFrameChannel(channel.readable(), frameReader).toList().size();
Assert.assertEquals(EXPECTED_NUM_ROWS, numRows);
}
@Test
public void test_readLocally_closePriorToRead() throws IOException
{
writeAllFramesToChannel();
reader.close();
// Can't read the channel after closing the reader
Assert.assertThrows(
IllegalStateException.class,
reader::readLocally
);
}
@Test
public void test_readLocally_thenReadRemotely() throws IOException
{
writeAllFramesToChannel();
Assert.assertSame(channel.readable(), reader.readLocally());
// Can't read remotely after reading locally
Assert.assertThrows(
IllegalStateException.class,
() -> reader.readRemotelyFrom(0)
private final Frame frame = Iterables.getOnlyElement(
FrameSequenceBuilder
.fromCursorFactory(new QueryableIndexCursorFactory(TestIndex.getNoRollupMMappedTestIndex()))
.frameType(FrameType.ROW_BASED)
.frames()
.toList()
);
// Can still read locally after this error
final int numRows = FrameTestUtil.readRowsFromFrameChannel(channel.readable(), frameReader).toList().size();
Assert.assertEquals(EXPECTED_NUM_ROWS, numRows);
}
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Test
public void test_readRemotely_strideBasedOnReturnedChunk() throws IOException
{
// Test that reads entire chunks from readRemotelyFrom. This is a typical usage pattern.
private BlockingQueueFrameChannel channel;
private ChannelStageOutputReader channelReader;
private File tmpFile;
private OutputStream tmpOut;
private FrameFile tmpFrameFile;
writeAllFramesToChannel();
// Variables used by doRead()
private long offset;
private ListenableFuture<InputStream> nextRead;
final File tmpFile = temporaryFolder.newFile();
try (final FileOutputStream tmpOut = new FileOutputStream(tmpFile)) {
int numReads = 0;
long offset = 0;
while (true) {
try (final InputStream in = FutureUtils.getUnchecked(reader.readRemotelyFrom(offset), true)) {
numReads++;
final long bytesWritten = ByteStreams.copy(in, tmpOut);
offset += bytesWritten;
if (bytesWritten == 0) {
break;
}
}
}
MatcherAssert.assertThat(numReads, Matchers.greaterThan(1));
@Before
public void setUp() throws Exception
{
channel = BlockingQueueFrameChannel.minimal();
channelReader = new ChannelStageOutputReader(channel.readable());
tmpFile = temporaryFolder.newFile();
tmpOut = Files.newOutputStream(tmpFile.toPath());
}
final FrameFile frameFile = FrameFile.open(tmpFile, null);
final int numRows =
FrameTestUtil.readRowsFromFrameChannel(new ReadableFileFrameChannel(frameFile), frameReader).toList().size();
Assert.assertEquals(EXPECTED_NUM_ROWS, numRows);
}
@Test
public void test_readRemotely_strideOneByte() throws IOException
{
// Test that reads one byte at a time from readRemotelyFrom. This helps ensure that there are no edge cases
// in the chunk-reading logic.
writeAllFramesToChannel();
final File tmpFile = temporaryFolder.newFile();
try (final FileOutputStream tmpOut = new FileOutputStream(tmpFile)) {
int numReads = 0;
long offset = 0;
while (true) {
try (final InputStream in = FutureUtils.getUnchecked(reader.readRemotelyFrom(offset), true)) {
numReads++;
final int nextByte = in.read();
if (nextByte < 0) {
break;
}
tmpOut.write(nextByte);
offset++;
}
}
Assert.assertEquals(numReads, offset + 1);
@After
public void tearDown() throws Exception
{
CloseableUtils.closeAll(tmpOut, tmpFrameFile);
}
final FrameFile frameFile = FrameFile.open(tmpFile, null);
final int numRows =
FrameTestUtil.readRowsFromFrameChannel(new ReadableFileFrameChannel(frameFile), frameReader).toList().size();
@Test
public void test_remote_empty() throws Exception
{
// Close without writing anything.
channel.writable().close();
Assert.assertEquals(EXPECTED_NUM_ROWS, numRows);
}
while (doRead(-1)) {
// Do nothing, just keep reading.
}
@Test
public void test_readRemotely_thenLocally() throws IOException
{
writeAllFramesToChannel();
Assert.assertEquals(0, tmpFrameFile.numFrames());
}
// Read remotely
FutureUtils.getUnchecked(reader.readRemotelyFrom(0), true);
// Then read locally
Assert.assertThrows(
IllegalStateException.class,
reader::readLocally
);
}
@Test
public void test_readRemotely_cannotReverse() throws IOException
{
writeAllFramesToChannel();
// Read remotely from offset = 1.
final InputStream in = FutureUtils.getUnchecked(reader.readRemotelyFrom(1), true);
final int offset = ByteStreams.toByteArray(in).length;
MatcherAssert.assertThat(offset, Matchers.greaterThan(0));
// Then read again from offset = 0; should get an error.
final RuntimeException e = Assert.assertThrows(
RuntimeException.class,
() -> FutureUtils.getUnchecked(reader.readRemotelyFrom(0), true)
);
MatcherAssert.assertThat(
e,
ThrowableCauseMatcher.hasCause(
Matchers.allOf(
CoreMatchers.instanceOf(IllegalStateException.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("Offset[0] no longer available"))
)
)
);
}
private void writeAllFramesToChannel() throws IOException
{
for (Frame frame : frameList) {
@Test
public void test_remote_oneFrame() throws Exception
{
// Close after writing one frame.
channel.writable().write(frame);
channel.writable().close();
while (doRead(-1)) {
// Do nothing, just keep reading.
}
Assert.assertEquals(1, tmpFrameFile.numFrames());
Assert.assertEquals(frame.numBytes(), tmpFrameFile.frame(0).numBytes());
}
@Test
public void test_remote_oneFrame_writeAfterFirstRead() throws Exception
{
Assert.assertTrue(doRead(-1));
// Close after writing one frame.
channel.writable().write(frame);
channel.writable().close();
while (doRead(-1)) {
// Do nothing, just keep reading.
}
Assert.assertEquals(1, tmpFrameFile.numFrames());
Assert.assertEquals(frame.numBytes(), tmpFrameFile.frame(0).numBytes());
}
@Test
public void test_remote_oneFrame_readOneByteAtATime() throws Exception
{
// Close after writing one frame.
channel.writable().write(frame);
channel.writable().close();
while (doRead(1)) {
// Do nothing, just keep reading.
}
Assert.assertEquals(1, tmpFrameFile.numFrames());
Assert.assertEquals(frame.numBytes(), tmpFrameFile.frame(0).numBytes());
}
@Test
public void test_remote_threeFrames_readOneByteAtATime() throws Exception
{
// Write one frame.
channel.writable().write(frame);
// See that we can't write another frame.
final IllegalStateException e = Assert.assertThrows(
IllegalStateException.class,
() -> channel.writable().write(frame)
);
MatcherAssert.assertThat(
e,
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("Channel has no capacity"))
);
// Read the first frame until we start blocking.
while (nextRead == null) {
Assert.assertTrue(doRead(1));
}
// Write the next frame.
Assert.assertFalse(nextRead.isDone());
channel.writable().write(frame);
// This write would have unblocked nextRead, which will now be done.
Assert.assertTrue(nextRead.isDone());
// Write a third frame.
channel.writable().write(frame);
// See that we can't write a fourth frame.
final IllegalStateException e2 = Assert.assertThrows(
IllegalStateException.class,
() -> channel.writable().write(frame)
);
MatcherAssert.assertThat(
e2,
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("Channel has no capacity"))
);
// And read until we start blocking.
while (nextRead == null) {
Assert.assertTrue(doRead(1));
}
// Close.
channel.writable().close();
// Read until end of stream.
while (doRead(1)) {
// Just keep looping.
}
Assert.assertEquals(3, tmpFrameFile.numFrames());
Assert.assertEquals(frame.numBytes(), tmpFrameFile.frame(0).numBytes());
Assert.assertEquals(frame.numBytes(), tmpFrameFile.frame(1).numBytes());
Assert.assertEquals(frame.numBytes(), tmpFrameFile.frame(2).numBytes());
}
/**
* Do the next read operation.
*
* @return false if done reading, true if there's more to read
*/
private boolean doRead(final long limit) throws IOException
{
if (nextRead == null) {
nextRead = channelReader.readRemotelyFrom(offset);
}
if (nextRead.isDone()) {
try (final InputStream in = FutureUtils.getUncheckedImmediately(nextRead)) {
nextRead = null;
long readSize = 0;
if (limit == -1) {
// Unlimited
readSize = ByteStreams.copy(in, tmpOut);
} else {
// Limited
while (readSize < limit) {
final int r = in.read();
if (r != -1) {
readSize++;
tmpOut.write(r);
} else {
break;
}
}
}
offset += readSize;
if (readSize == 0) {
channel.readable().close();
tmpOut.close();
tmpFrameFile = FrameFile.open(tmpFile, null);
return false;
}
}
}
return true;
}
}
/**
* Tests that use {@link BlockingQueueFrameChannel} that is fully buffered.
*/
public static class WithMaximalBuffering extends InitializedNullHandlingTest
{
private static final int MAX_FRAMES = 10;
private static final int EXPECTED_NUM_ROWS = 1209;
private final BlockingQueueFrameChannel channel = new BlockingQueueFrameChannel(MAX_FRAMES);
private final ChannelStageOutputReader reader = new ChannelStageOutputReader(channel.readable());
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private FrameReader frameReader;
private List<Frame> frameList;
@Before
public void setUp()
{
final IncrementalIndex index = TestIndex.getIncrementalTestIndex();
final IncrementalIndexCursorFactory adapter = new IncrementalIndexCursorFactory(index);
frameReader = FrameReader.create(adapter.getRowSignature());
frameList = FrameSequenceBuilder.fromCursorFactory(adapter)
.frameType(FrameType.ROW_BASED)
.maxRowsPerFrame(IntMath.divide(index.size(), MAX_FRAMES, RoundingMode.CEILING))
.frames()
.toList();
}
@After
public void tearDown()
{
reader.close();
}
@Test
public void test_readLocally() throws IOException
{
writeAllFramesToChannel();
Assert.assertSame(channel.readable(), reader.readLocally());
reader.close(); // Won't close the channel, because it's already been returned by readLocally
final int numRows = FrameTestUtil.readRowsFromFrameChannel(channel.readable(), frameReader).toList().size();
Assert.assertEquals(EXPECTED_NUM_ROWS, numRows);
}
@Test
public void test_readLocally_closePriorToRead() throws IOException
{
writeAllFramesToChannel();
reader.close();
// Can't read the channel after closing the reader
Assert.assertThrows(
IllegalStateException.class,
reader::readLocally
);
}
@Test
public void test_readLocally_thenReadRemotely() throws IOException
{
writeAllFramesToChannel();
Assert.assertSame(channel.readable(), reader.readLocally());
// Can't read remotely after reading locally
Assert.assertThrows(
IllegalStateException.class,
() -> reader.readRemotelyFrom(0)
);
// Can still read locally after this error
final int numRows = FrameTestUtil.readRowsFromFrameChannel(channel.readable(), frameReader).toList().size();
Assert.assertEquals(EXPECTED_NUM_ROWS, numRows);
}
@Test
public void test_readRemotely_strideBasedOnReturnedChunk() throws IOException
{
// Test that reads entire chunks from readRemotelyFrom. This is a typical usage pattern.
writeAllFramesToChannel();
final File tmpFile = temporaryFolder.newFile();
try (final FileOutputStream tmpOut = new FileOutputStream(tmpFile)) {
int numReads = 0;
long offset = 0;
while (true) {
try (final InputStream in = FutureUtils.getUnchecked(reader.readRemotelyFrom(offset), true)) {
numReads++;
final long bytesWritten = ByteStreams.copy(in, tmpOut);
offset += bytesWritten;
if (bytesWritten == 0) {
break;
}
}
}
MatcherAssert.assertThat(numReads, Matchers.greaterThan(1));
}
final FrameFile frameFile = FrameFile.open(tmpFile, null);
final int numRows =
FrameTestUtil.readRowsFromFrameChannel(new ReadableFileFrameChannel(frameFile), frameReader).toList().size();
Assert.assertEquals(EXPECTED_NUM_ROWS, numRows);
}
@Test
public void test_readRemotely_strideOneByte() throws IOException
{
// Test that reads one byte at a time from readRemotelyFrom. This helps ensure that there are no edge cases
// in the chunk-reading logic.
writeAllFramesToChannel();
final File tmpFile = temporaryFolder.newFile();
try (final OutputStream tmpOut = Files.newOutputStream(tmpFile.toPath())) {
int numReads = 0;
long offset = 0;
while (true) {
try (final InputStream in = FutureUtils.getUnchecked(reader.readRemotelyFrom(offset), true)) {
numReads++;
final int nextByte = in.read();
if (nextByte < 0) {
break;
}
tmpOut.write(nextByte);
offset++;
}
}
Assert.assertEquals(numReads, offset + 1);
}
final FrameFile frameFile = FrameFile.open(tmpFile, null);
final int numRows =
FrameTestUtil.readRowsFromFrameChannel(new ReadableFileFrameChannel(frameFile), frameReader).toList().size();
Assert.assertEquals(EXPECTED_NUM_ROWS, numRows);
}
@Test
public void test_readRemotely_thenLocally() throws IOException
{
writeAllFramesToChannel();
// Read remotely
FutureUtils.getUnchecked(reader.readRemotelyFrom(0), true);
// Then read locally
Assert.assertThrows(
IllegalStateException.class,
reader::readLocally
);
}
@Test
public void test_readRemotely_cannotReverse() throws IOException
{
writeAllFramesToChannel();
// Read remotely from offset = 1.
final InputStream in = FutureUtils.getUnchecked(reader.readRemotelyFrom(1), true);
final int offset = ByteStreams.toByteArray(in).length;
MatcherAssert.assertThat(offset, Matchers.greaterThan(0));
// Then read again from offset = 0; should get an error.
final RuntimeException e = Assert.assertThrows(
RuntimeException.class,
() -> FutureUtils.getUnchecked(reader.readRemotelyFrom(0), true)
);
MatcherAssert.assertThat(
e,
ThrowableCauseMatcher.hasCause(
Matchers.allOf(
CoreMatchers.instanceOf(IllegalStateException.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("Offset[0] no longer available"))
)
)
);
}
private void writeAllFramesToChannel() throws IOException
{
for (Frame frame : frameList) {
channel.writable().write(frame);
}
channel.writable().close();
}
channel.writable().close();
}
}