mirror of https://github.com/apache/druid.git
Fix accounting of bytesAdded in ReadableByteChunksFrameChannel. (#12988)
* Fix accounting of bytesAdded in ReadableByteChunksFrameChannel. Could cause WorkerInputChannelFactory to get into an infinite loop when reading the footer of a frame file. * Additional tests.
This commit is contained in:
parent
9eb20e5e7c
commit
414176fb97
|
@ -139,12 +139,13 @@ public class ReadableByteChunksFrameChannel implements ReadableFrameChannel
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (chunk.length > 0) {
|
if (chunk.length > 0) {
|
||||||
|
bytesAdded += chunk.length;
|
||||||
|
|
||||||
if (streamPart != StreamPart.FOOTER) {
|
if (streamPart != StreamPart.FOOTER) {
|
||||||
// Footer is discarded: it isn't useful when reading frame files as streams. (It contains pointers
|
// Footer is discarded: it isn't useful when reading frame files as streams. (It contains pointers
|
||||||
// for random access of frames.)
|
// for random access of frames.)
|
||||||
chunks.add(Either.value(chunk));
|
chunks.add(Either.value(chunk));
|
||||||
bytesBuffered += chunk.length;
|
bytesBuffered += chunk.length;
|
||||||
bytesAdded += chunk.length;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
updateStreamState();
|
updateStreamState();
|
||||||
|
|
|
@ -108,6 +108,7 @@ public class ReadableByteChunksFrameChannelTest
|
||||||
final ReadableByteChunksFrameChannel channel = ReadableByteChunksFrameChannel.create("test");
|
final ReadableByteChunksFrameChannel channel = ReadableByteChunksFrameChannel.create("test");
|
||||||
channel.addChunk(Files.toByteArray(file));
|
channel.addChunk(Files.toByteArray(file));
|
||||||
channel.doneWriting();
|
channel.doneWriting();
|
||||||
|
Assert.assertEquals(file.length(), channel.getBytesAdded());
|
||||||
|
|
||||||
while (channel.canRead()) {
|
while (channel.canRead()) {
|
||||||
Assert.assertFalse(channel.isFinished());
|
Assert.assertFalse(channel.isFinished());
|
||||||
|
@ -145,6 +146,7 @@ public class ReadableByteChunksFrameChannelTest
|
||||||
final ReadableByteChunksFrameChannel channel = ReadableByteChunksFrameChannel.create("test");
|
final ReadableByteChunksFrameChannel channel = ReadableByteChunksFrameChannel.create("test");
|
||||||
channel.addChunk(truncatedFile);
|
channel.addChunk(truncatedFile);
|
||||||
channel.doneWriting();
|
channel.doneWriting();
|
||||||
|
Assert.assertEquals(truncatedFile.length, channel.getBytesAdded());
|
||||||
|
|
||||||
Assert.assertTrue(channel.canRead());
|
Assert.assertTrue(channel.canRead());
|
||||||
Assert.assertFalse(channel.isFinished());
|
Assert.assertFalse(channel.isFinished());
|
||||||
|
@ -187,8 +189,11 @@ public class ReadableByteChunksFrameChannelTest
|
||||||
final byte[] chunk1 = new byte[errorAtBytePosition];
|
final byte[] chunk1 = new byte[errorAtBytePosition];
|
||||||
System.arraycopy(fileBytes, 0, chunk1, 0, chunk1.length);
|
System.arraycopy(fileBytes, 0, chunk1, 0, chunk1.length);
|
||||||
channel.addChunk(chunk1);
|
channel.addChunk(chunk1);
|
||||||
|
Assert.assertEquals(chunk1.length, channel.getBytesAdded());
|
||||||
|
|
||||||
channel.setError(new ISE("Test error!"));
|
channel.setError(new ISE("Test error!"));
|
||||||
channel.doneWriting();
|
channel.doneWriting();
|
||||||
|
Assert.assertEquals(chunk1.length, channel.getBytesAdded());
|
||||||
|
|
||||||
expectedException.expect(IllegalStateException.class);
|
expectedException.expect(IllegalStateException.class);
|
||||||
expectedException.expectMessage("Test error!");
|
expectedException.expectMessage("Test error!");
|
||||||
|
@ -250,13 +255,17 @@ public class ReadableByteChunksFrameChannelTest
|
||||||
final ReadableByteChunksFrameChannel channel = ReadableByteChunksFrameChannel.create("test");
|
final ReadableByteChunksFrameChannel channel = ReadableByteChunksFrameChannel.create("test");
|
||||||
ListenableFuture<?> firstBackpressureFuture = null;
|
ListenableFuture<?> firstBackpressureFuture = null;
|
||||||
|
|
||||||
|
long totalSize = 0;
|
||||||
Assert.assertEquals(0, channel.getBytesBuffered());
|
Assert.assertEquals(0, channel.getBytesBuffered());
|
||||||
|
|
||||||
try (final Chunker chunker = new Chunker(new FileInputStream(file), chunkSize)) {
|
try (final Chunker chunker = new Chunker(new FileInputStream(file), chunkSize)) {
|
||||||
byte[] chunk;
|
byte[] chunk;
|
||||||
|
|
||||||
while ((chunk = chunker.nextChunk()) != null) {
|
while ((chunk = chunker.nextChunk()) != null) {
|
||||||
|
totalSize += chunk.length;
|
||||||
|
|
||||||
final ListenableFuture<?> backpressureFuture = channel.addChunk(chunk);
|
final ListenableFuture<?> backpressureFuture = channel.addChunk(chunk);
|
||||||
|
Assert.assertEquals(channel.getBytesAdded(), totalSize);
|
||||||
|
|
||||||
// Minimally-sized channel means backpressure is exerted as soon as a single frame is available.
|
// Minimally-sized channel means backpressure is exerted as soon as a single frame is available.
|
||||||
Assert.assertEquals(channel.canRead(), backpressureFuture != null);
|
Assert.assertEquals(channel.canRead(), backpressureFuture != null);
|
||||||
|
@ -303,6 +312,7 @@ public class ReadableByteChunksFrameChannelTest
|
||||||
ListenableFuture<?> backpressureFuture = null;
|
ListenableFuture<?> backpressureFuture = null;
|
||||||
|
|
||||||
int iteration = 0;
|
int iteration = 0;
|
||||||
|
long totalSize = 0;
|
||||||
|
|
||||||
try (final Chunker chunker = new Chunker(new FileInputStream(file), chunkSize)) {
|
try (final Chunker chunker = new Chunker(new FileInputStream(file), chunkSize)) {
|
||||||
byte[] chunk;
|
byte[] chunk;
|
||||||
|
@ -327,9 +337,11 @@ public class ReadableByteChunksFrameChannelTest
|
||||||
}
|
}
|
||||||
|
|
||||||
iteration++;
|
iteration++;
|
||||||
|
totalSize += chunk.length;
|
||||||
|
|
||||||
// Write next chunk.
|
// Write next chunk.
|
||||||
final ListenableFuture<?> addVal = channel.addChunk(chunk);
|
final ListenableFuture<?> addVal = channel.addChunk(chunk);
|
||||||
|
Assert.assertEquals(totalSize, channel.getBytesAdded());
|
||||||
|
|
||||||
// Minimally-sized channel means backpressure is exerted as soon as a single frame is available.
|
// Minimally-sized channel means backpressure is exerted as soon as a single frame is available.
|
||||||
Assert.assertEquals(channel.canRead(), addVal != null);
|
Assert.assertEquals(channel.canRead(), addVal != null);
|
||||||
|
|
Loading…
Reference in New Issue