diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java index e55696e9650..97da65585d6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java @@ -55,6 +55,15 @@ public interface FileRange { */ void setData(CompletableFuture data); + /** + * Get any reference passed in to the file range constructor. + * This is not used by any implementation code; it is to help + * bind this API to libraries retrieving multiple stripes of + * data in parallel. + * @return a reference or null. + */ + Object getReference(); + /** * Factory method to create a FileRange object. * @param offset starting offset of the range. @@ -62,6 +71,17 @@ public interface FileRange { * @return a new instance of FileRangeImpl. */ static FileRange createFileRange(long offset, int length) { - return new FileRangeImpl(offset, length); + return new FileRangeImpl(offset, length, null); + } + + /** + * Factory method to create a FileRange object. + * @param offset starting offset of the range. + * @param length length of the range. + * @param reference nullable reference to store in the range. + * @return a new instance of FileRangeImpl. + */ + static FileRange createFileRange(long offset, int length, Object reference) { + return new FileRangeImpl(offset, length, reference); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java index 516bbb2c70c..c9555a1e541 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java @@ -29,10 +29,10 @@ import java.util.List; * together into a single read for efficiency. */ public class CombinedFileRange extends FileRangeImpl { - private ArrayList underlying = new ArrayList<>(); + private List underlying = new ArrayList<>(); public CombinedFileRange(long offset, long end, FileRange original) { - super(offset, (int) (end - offset)); + super(offset, (int) (end - offset), null); this.underlying.add(original); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java index 041e5f0a8d2..1239be764ba 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java @@ -34,9 +34,21 @@ public class FileRangeImpl implements FileRange { private int length; private CompletableFuture reader; - public FileRangeImpl(long offset, int length) { + /** + * nullable reference to store in the range. + */ + private final Object reference; + + /** + * Create. + * @param offset offset in file + * @param length length of data to read. + * @param reference nullable reference to store in the range. + */ + public FileRangeImpl(long offset, int length, Object reference) { this.offset = offset; this.length = length; + this.reference = reference; } @Override @@ -71,4 +83,9 @@ public class FileRangeImpl implements FileRange { public CompletableFuture getData() { return reader; } + + @Override + public Object getReference() { + return reference; + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java index ebf0e14053b..fdfa8f6eb6f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java @@ -96,7 +96,10 @@ public class TestVectoredReadUtils extends HadoopTestBase { @Test public void testMerge() { - FileRange base = FileRange.createFileRange(2000, 1000); + // a reference to use for tracking + Object tracker1 = "one"; + Object tracker2 = "two"; + FileRange base = FileRange.createFileRange(2000, 1000, tracker1); CombinedFileRange mergeBase = new CombinedFileRange(2000, 3000, base); // test when the gap between is too big @@ -104,44 +107,48 @@ public class TestVectoredReadUtils extends HadoopTestBase { FileRange.createFileRange(5000, 1000), 2000, 4000)); assertEquals("Number of ranges in merged range shouldn't increase", 1, mergeBase.getUnderlying().size()); - assertEquals("post merge offset", 2000, mergeBase.getOffset()); - assertEquals("post merge length", 1000, mergeBase.getLength()); + assertFileRange(mergeBase, 2000, 1000); // test when the total size gets exceeded assertFalse("Large size ranges shouldn't get merged", mergeBase.merge(5000, 6000, FileRange.createFileRange(5000, 1000), 2001, 3999)); assertEquals("Number of ranges in merged range shouldn't increase", 1, mergeBase.getUnderlying().size()); - assertEquals("post merge offset", 2000, mergeBase.getOffset()); - assertEquals("post merge length", 1000, mergeBase.getLength()); + assertFileRange(mergeBase, 2000, 1000); // test when the merge works assertTrue("ranges should get merged ", mergeBase.merge(5000, 6000, - FileRange.createFileRange(5000, 1000), 2001, 4000)); + FileRange.createFileRange(5000, 1000, tracker2), + 2001, 4000)); assertEquals("post merge size", 2, mergeBase.getUnderlying().size()); - assertEquals("post merge offset", 2000, mergeBase.getOffset()); - assertEquals("post merge length", 4000, mergeBase.getLength()); + assertFileRange(mergeBase, 2000, 4000); + + Assertions.assertThat(mergeBase.getUnderlying().get(0).getReference()) + .describedAs("reference of range %s", mergeBase.getUnderlying().get(0)) + .isSameAs(tracker1); + Assertions.assertThat(mergeBase.getUnderlying().get(1).getReference()) + .describedAs("reference of range %s", mergeBase.getUnderlying().get(1)) + .isSameAs(tracker2); // reset the mergeBase and test with a 10:1 reduction mergeBase = new CombinedFileRange(200, 300, base); - assertEquals(200, mergeBase.getOffset()); - assertEquals(100, mergeBase.getLength()); + assertFileRange(mergeBase, 200, 100); + assertTrue("ranges should get merged ", mergeBase.merge(500, 600, FileRange.createFileRange(5000, 1000), 201, 400)); assertEquals("post merge size", 2, mergeBase.getUnderlying().size()); - assertEquals("post merge offset", 200, mergeBase.getOffset()); - assertEquals("post merge length", 400, mergeBase.getLength()); + assertFileRange(mergeBase, 200, 400); } @Test public void testSortAndMerge() { List input = Arrays.asList( - FileRange.createFileRange(3000, 100), - FileRange.createFileRange(2100, 100), - FileRange.createFileRange(1000, 100) + FileRange.createFileRange(3000, 100, "1"), + FileRange.createFileRange(2100, 100, null), + FileRange.createFileRange(1000, 100, "3") ); assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800)); - List outputList = VectoredReadUtils.mergeSortedRanges( + final List outputList = VectoredReadUtils.mergeSortedRanges( Arrays.asList(sortRanges(input)), 100, 1001, 2500); Assertions.assertThat(outputList) .describedAs("merged range size") @@ -150,51 +157,105 @@ public class TestVectoredReadUtils extends HadoopTestBase { Assertions.assertThat(output.getUnderlying()) .describedAs("merged range underlying size") .hasSize(3); - assertEquals("range[1000,3100)", output.toString()); + // range[1000,3100) + assertFileRange(output, 1000, 2100); assertTrue("merged output ranges are disjoint", VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800)); // the minSeek doesn't allow the first two to merge assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 1000)); - outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)), + final List list2 = VectoredReadUtils.mergeSortedRanges( + Arrays.asList(sortRanges(input)), 100, 1000, 2100); - Assertions.assertThat(outputList) + Assertions.assertThat(list2) .describedAs("merged range size") .hasSize(2); - assertEquals("range[1000,1100)", outputList.get(0).toString()); - assertEquals("range[2100,3100)", outputList.get(1).toString()); + assertFileRange(list2.get(0), 1000, 100); + + // range[2100,3100) + assertFileRange(list2.get(1), 2100, 1000); + assertTrue("merged output ranges are disjoint", - VectoredReadUtils.isOrderedDisjoint(outputList, 100, 1000)); + VectoredReadUtils.isOrderedDisjoint(list2, 100, 1000)); // the maxSize doesn't allow the third range to merge assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800)); - outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)), + final List list3 = VectoredReadUtils.mergeSortedRanges( + Arrays.asList(sortRanges(input)), 100, 1001, 2099); - Assertions.assertThat(outputList) + Assertions.assertThat(list3) .describedAs("merged range size") .hasSize(2); - assertEquals("range[1000,2200)", outputList.get(0).toString()); - assertEquals("range[3000,3100)", outputList.get(1).toString()); + // range[1000,2200) + CombinedFileRange range0 = list3.get(0); + assertFileRange(range0, 1000, 1200); + assertFileRange(range0.getUnderlying().get(0), + 1000, 100, "3"); + assertFileRange(range0.getUnderlying().get(1), + 2100, 100, null); + CombinedFileRange range1 = list3.get(1); + // range[3000,3100) + assertFileRange(range1, 3000, 100); + assertFileRange(range1.getUnderlying().get(0), + 3000, 100, "1"); + assertTrue("merged output ranges are disjoint", - VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800)); + VectoredReadUtils.isOrderedDisjoint(list3, 100, 800)); // test the round up and round down (the maxSize doesn't allow any merges) assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 16, 700)); - outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)), + final List list4 = VectoredReadUtils.mergeSortedRanges( + Arrays.asList(sortRanges(input)), 16, 1001, 100); - Assertions.assertThat(outputList) + Assertions.assertThat(list4) .describedAs("merged range size") .hasSize(3); - assertEquals("range[992,1104)", outputList.get(0).toString()); - assertEquals("range[2096,2208)", outputList.get(1).toString()); - assertEquals("range[2992,3104)", outputList.get(2).toString()); + // range[992,1104) + assertFileRange(list4.get(0), 992, 112); + // range[2096,2208) + assertFileRange(list4.get(1), 2096, 112); + // range[2992,3104) + assertFileRange(list4.get(2), 2992, 112); assertTrue("merged output ranges are disjoint", - VectoredReadUtils.isOrderedDisjoint(outputList, 16, 700)); + VectoredReadUtils.isOrderedDisjoint(list4, 16, 700)); } + /** + * Assert that a file range satisfies the conditions. + * @param range range to validate + * @param offset offset of range + * @param length range length + */ + private void assertFileRange(FileRange range, long offset, int length) { + Assertions.assertThat(range) + .describedAs("file range %s", range) + .isNotNull(); + Assertions.assertThat(range.getOffset()) + .describedAs("offset of %s", range) + .isEqualTo(offset); + Assertions.assertThat(range.getLength()) + .describedAs("length of %s", range) + .isEqualTo(length); + } + + /** + * Assert that a file range satisfies the conditions. + * @param range range to validate + * @param offset offset of range + * @param length range length + * @param reference reference; may be null. + */ + private void assertFileRange(FileRange range, long offset, int length, Object reference) { + assertFileRange(range, offset, length); + Assertions.assertThat(range.getReference()) + .describedAs("reference field of file range %s", range) + .isEqualTo(reference); + } + + @Test public void testSortAndMergeMoreCases() throws Exception { List input = Arrays.asList( @@ -214,7 +275,9 @@ public class TestVectoredReadUtils extends HadoopTestBase { Assertions.assertThat(output.getUnderlying()) .describedAs("merged range underlying size") .hasSize(4); - assertEquals("range[1000,3110)", output.toString()); + + assertFileRange(output, 1000, 2110); + assertTrue("merged output ranges are disjoint", VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800)); @@ -227,7 +290,8 @@ public class TestVectoredReadUtils extends HadoopTestBase { Assertions.assertThat(output.getUnderlying()) .describedAs("merged range underlying size") .hasSize(4); - assertEquals("range[1000,3200)", output.toString()); + assertFileRange(output, 1000, 2200); + assertTrue("merged output ranges are disjoint", VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800)); diff --git a/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java b/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java index 631842f78e2..5df46c36786 100644 --- a/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java +++ b/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java @@ -169,7 +169,7 @@ public class VectoredReadBenchmark { FileRangeCallback(AsynchronousFileChannel channel, long offset, int length, Joiner joiner, ByteBuffer buffer) { - super(offset, length); + super(offset, length, null); this.channel = channel; this.joiner = joiner; this.buffer = buffer;