From 414176fb97b0bb852feffa11bb1914d4c82e6d64 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 29 Aug 2022 18:25:28 -0700 Subject: [PATCH] Fix accounting of bytesAdded in ReadableByteChunksFrameChannel. (#12988) * Fix accounting of bytesAdded in ReadableByteChunksFrameChannel. Could cause WorkerInputChannelFactory to get into an infinite loop when reading the footer of a frame file. * Additional tests. --- .../channel/ReadableByteChunksFrameChannel.java | 3 ++- .../channel/ReadableByteChunksFrameChannelTest.java | 12 ++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java b/processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java index f9acf2d0e4f..2d659c2b304 100644 --- a/processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java +++ b/processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java @@ -139,12 +139,13 @@ public class ReadableByteChunksFrameChannel implements ReadableFrameChannel try { if (chunk.length > 0) { + bytesAdded += chunk.length; + if (streamPart != StreamPart.FOOTER) { // Footer is discarded: it isn't useful when reading frame files as streams. (It contains pointers // for random access of frames.) chunks.add(Either.value(chunk)); bytesBuffered += chunk.length; - bytesAdded += chunk.length; } updateStreamState(); diff --git a/processing/src/test/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannelTest.java b/processing/src/test/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannelTest.java index 2040c086c82..a6e95f34ab8 100644 --- a/processing/src/test/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannelTest.java +++ b/processing/src/test/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannelTest.java @@ -108,6 +108,7 @@ public class ReadableByteChunksFrameChannelTest final ReadableByteChunksFrameChannel channel = ReadableByteChunksFrameChannel.create("test"); channel.addChunk(Files.toByteArray(file)); channel.doneWriting(); + Assert.assertEquals(file.length(), channel.getBytesAdded()); while (channel.canRead()) { Assert.assertFalse(channel.isFinished()); @@ -145,6 +146,7 @@ public class ReadableByteChunksFrameChannelTest final ReadableByteChunksFrameChannel channel = ReadableByteChunksFrameChannel.create("test"); channel.addChunk(truncatedFile); channel.doneWriting(); + Assert.assertEquals(truncatedFile.length, channel.getBytesAdded()); Assert.assertTrue(channel.canRead()); Assert.assertFalse(channel.isFinished()); @@ -187,8 +189,11 @@ public class ReadableByteChunksFrameChannelTest final byte[] chunk1 = new byte[errorAtBytePosition]; System.arraycopy(fileBytes, 0, chunk1, 0, chunk1.length); channel.addChunk(chunk1); + Assert.assertEquals(chunk1.length, channel.getBytesAdded()); + channel.setError(new ISE("Test error!")); channel.doneWriting(); + Assert.assertEquals(chunk1.length, channel.getBytesAdded()); expectedException.expect(IllegalStateException.class); expectedException.expectMessage("Test error!"); @@ -250,13 +255,17 @@ public class ReadableByteChunksFrameChannelTest final ReadableByteChunksFrameChannel channel = ReadableByteChunksFrameChannel.create("test"); ListenableFuture firstBackpressureFuture = null; + long totalSize = 0; Assert.assertEquals(0, channel.getBytesBuffered()); try (final Chunker chunker = new Chunker(new FileInputStream(file), chunkSize)) { byte[] chunk; while ((chunk = chunker.nextChunk()) != null) { + totalSize += chunk.length; + final ListenableFuture backpressureFuture = channel.addChunk(chunk); + Assert.assertEquals(channel.getBytesAdded(), totalSize); // Minimally-sized channel means backpressure is exerted as soon as a single frame is available. Assert.assertEquals(channel.canRead(), backpressureFuture != null); @@ -303,6 +312,7 @@ public class ReadableByteChunksFrameChannelTest ListenableFuture backpressureFuture = null; int iteration = 0; + long totalSize = 0; try (final Chunker chunker = new Chunker(new FileInputStream(file), chunkSize)) { byte[] chunk; @@ -327,9 +337,11 @@ public class ReadableByteChunksFrameChannelTest } iteration++; + totalSize += chunk.length; // Write next chunk. final ListenableFuture addVal = channel.addChunk(chunk); + Assert.assertEquals(totalSize, channel.getBytesAdded()); // Minimally-sized channel means backpressure is exerted as soon as a single frame is available. Assert.assertEquals(channel.canRead(), addVal != null);