HADOOP-18507. VectorIO FileRange type to support a "reference" field (#5076)
Contributed by Steve Loughran
This commit is contained in:
parent
b1f418f802
commit
3b10cb5a3b
|
@ -55,6 +55,15 @@ public interface FileRange {
|
||||||
*/
|
*/
|
||||||
void setData(CompletableFuture<ByteBuffer> data);
|
void setData(CompletableFuture<ByteBuffer> data);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get any reference passed in to the file range constructor.
|
||||||
|
* This is not used by any implementation code; it is to help
|
||||||
|
* bind this API to libraries retrieving multiple stripes of
|
||||||
|
* data in parallel.
|
||||||
|
* @return a reference or null.
|
||||||
|
*/
|
||||||
|
Object getReference();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Factory method to create a FileRange object.
|
* Factory method to create a FileRange object.
|
||||||
* @param offset starting offset of the range.
|
* @param offset starting offset of the range.
|
||||||
|
@ -62,6 +71,17 @@ public interface FileRange {
|
||||||
* @return a new instance of FileRangeImpl.
|
* @return a new instance of FileRangeImpl.
|
||||||
*/
|
*/
|
||||||
static FileRange createFileRange(long offset, int length) {
|
static FileRange createFileRange(long offset, int length) {
|
||||||
return new FileRangeImpl(offset, length);
|
return new FileRangeImpl(offset, length, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Factory method to create a FileRange object.
|
||||||
|
* @param offset starting offset of the range.
|
||||||
|
* @param length length of the range.
|
||||||
|
* @param reference nullable reference to store in the range.
|
||||||
|
* @return a new instance of FileRangeImpl.
|
||||||
|
*/
|
||||||
|
static FileRange createFileRange(long offset, int length, Object reference) {
|
||||||
|
return new FileRangeImpl(offset, length, reference);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,10 +29,10 @@ import java.util.List;
|
||||||
* together into a single read for efficiency.
|
* together into a single read for efficiency.
|
||||||
*/
|
*/
|
||||||
public class CombinedFileRange extends FileRangeImpl {
|
public class CombinedFileRange extends FileRangeImpl {
|
||||||
private ArrayList<FileRange> underlying = new ArrayList<>();
|
private List<FileRange> underlying = new ArrayList<>();
|
||||||
|
|
||||||
public CombinedFileRange(long offset, long end, FileRange original) {
|
public CombinedFileRange(long offset, long end, FileRange original) {
|
||||||
super(offset, (int) (end - offset));
|
super(offset, (int) (end - offset), null);
|
||||||
this.underlying.add(original);
|
this.underlying.add(original);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -34,9 +34,21 @@ public class FileRangeImpl implements FileRange {
|
||||||
private int length;
|
private int length;
|
||||||
private CompletableFuture<ByteBuffer> reader;
|
private CompletableFuture<ByteBuffer> reader;
|
||||||
|
|
||||||
public FileRangeImpl(long offset, int length) {
|
/**
|
||||||
|
* nullable reference to store in the range.
|
||||||
|
*/
|
||||||
|
private final Object reference;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create.
|
||||||
|
* @param offset offset in file
|
||||||
|
* @param length length of data to read.
|
||||||
|
* @param reference nullable reference to store in the range.
|
||||||
|
*/
|
||||||
|
public FileRangeImpl(long offset, int length, Object reference) {
|
||||||
this.offset = offset;
|
this.offset = offset;
|
||||||
this.length = length;
|
this.length = length;
|
||||||
|
this.reference = reference;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -71,4 +83,9 @@ public class FileRangeImpl implements FileRange {
|
||||||
public CompletableFuture<ByteBuffer> getData() {
|
public CompletableFuture<ByteBuffer> getData() {
|
||||||
return reader;
|
return reader;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object getReference() {
|
||||||
|
return reference;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -96,7 +96,10 @@ public class TestVectoredReadUtils extends HadoopTestBase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMerge() {
|
public void testMerge() {
|
||||||
FileRange base = FileRange.createFileRange(2000, 1000);
|
// a reference to use for tracking
|
||||||
|
Object tracker1 = "one";
|
||||||
|
Object tracker2 = "two";
|
||||||
|
FileRange base = FileRange.createFileRange(2000, 1000, tracker1);
|
||||||
CombinedFileRange mergeBase = new CombinedFileRange(2000, 3000, base);
|
CombinedFileRange mergeBase = new CombinedFileRange(2000, 3000, base);
|
||||||
|
|
||||||
// test when the gap between is too big
|
// test when the gap between is too big
|
||||||
|
@ -104,44 +107,48 @@ public class TestVectoredReadUtils extends HadoopTestBase {
|
||||||
FileRange.createFileRange(5000, 1000), 2000, 4000));
|
FileRange.createFileRange(5000, 1000), 2000, 4000));
|
||||||
assertEquals("Number of ranges in merged range shouldn't increase",
|
assertEquals("Number of ranges in merged range shouldn't increase",
|
||||||
1, mergeBase.getUnderlying().size());
|
1, mergeBase.getUnderlying().size());
|
||||||
assertEquals("post merge offset", 2000, mergeBase.getOffset());
|
assertFileRange(mergeBase, 2000, 1000);
|
||||||
assertEquals("post merge length", 1000, mergeBase.getLength());
|
|
||||||
|
|
||||||
// test when the total size gets exceeded
|
// test when the total size gets exceeded
|
||||||
assertFalse("Large size ranges shouldn't get merged", mergeBase.merge(5000, 6000,
|
assertFalse("Large size ranges shouldn't get merged", mergeBase.merge(5000, 6000,
|
||||||
FileRange.createFileRange(5000, 1000), 2001, 3999));
|
FileRange.createFileRange(5000, 1000), 2001, 3999));
|
||||||
assertEquals("Number of ranges in merged range shouldn't increase",
|
assertEquals("Number of ranges in merged range shouldn't increase",
|
||||||
1, mergeBase.getUnderlying().size());
|
1, mergeBase.getUnderlying().size());
|
||||||
assertEquals("post merge offset", 2000, mergeBase.getOffset());
|
assertFileRange(mergeBase, 2000, 1000);
|
||||||
assertEquals("post merge length", 1000, mergeBase.getLength());
|
|
||||||
|
|
||||||
// test when the merge works
|
// test when the merge works
|
||||||
assertTrue("ranges should get merged ", mergeBase.merge(5000, 6000,
|
assertTrue("ranges should get merged ", mergeBase.merge(5000, 6000,
|
||||||
FileRange.createFileRange(5000, 1000), 2001, 4000));
|
FileRange.createFileRange(5000, 1000, tracker2),
|
||||||
|
2001, 4000));
|
||||||
assertEquals("post merge size", 2, mergeBase.getUnderlying().size());
|
assertEquals("post merge size", 2, mergeBase.getUnderlying().size());
|
||||||
assertEquals("post merge offset", 2000, mergeBase.getOffset());
|
assertFileRange(mergeBase, 2000, 4000);
|
||||||
assertEquals("post merge length", 4000, mergeBase.getLength());
|
|
||||||
|
Assertions.assertThat(mergeBase.getUnderlying().get(0).getReference())
|
||||||
|
.describedAs("reference of range %s", mergeBase.getUnderlying().get(0))
|
||||||
|
.isSameAs(tracker1);
|
||||||
|
Assertions.assertThat(mergeBase.getUnderlying().get(1).getReference())
|
||||||
|
.describedAs("reference of range %s", mergeBase.getUnderlying().get(1))
|
||||||
|
.isSameAs(tracker2);
|
||||||
|
|
||||||
// reset the mergeBase and test with a 10:1 reduction
|
// reset the mergeBase and test with a 10:1 reduction
|
||||||
mergeBase = new CombinedFileRange(200, 300, base);
|
mergeBase = new CombinedFileRange(200, 300, base);
|
||||||
assertEquals(200, mergeBase.getOffset());
|
assertFileRange(mergeBase, 200, 100);
|
||||||
assertEquals(100, mergeBase.getLength());
|
|
||||||
assertTrue("ranges should get merged ", mergeBase.merge(500, 600,
|
assertTrue("ranges should get merged ", mergeBase.merge(500, 600,
|
||||||
FileRange.createFileRange(5000, 1000), 201, 400));
|
FileRange.createFileRange(5000, 1000), 201, 400));
|
||||||
assertEquals("post merge size", 2, mergeBase.getUnderlying().size());
|
assertEquals("post merge size", 2, mergeBase.getUnderlying().size());
|
||||||
assertEquals("post merge offset", 200, mergeBase.getOffset());
|
assertFileRange(mergeBase, 200, 400);
|
||||||
assertEquals("post merge length", 400, mergeBase.getLength());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSortAndMerge() {
|
public void testSortAndMerge() {
|
||||||
List<FileRange> input = Arrays.asList(
|
List<FileRange> input = Arrays.asList(
|
||||||
FileRange.createFileRange(3000, 100),
|
FileRange.createFileRange(3000, 100, "1"),
|
||||||
FileRange.createFileRange(2100, 100),
|
FileRange.createFileRange(2100, 100, null),
|
||||||
FileRange.createFileRange(1000, 100)
|
FileRange.createFileRange(1000, 100, "3")
|
||||||
);
|
);
|
||||||
assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800));
|
assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800));
|
||||||
List<CombinedFileRange> outputList = VectoredReadUtils.mergeSortedRanges(
|
final List<CombinedFileRange> outputList = VectoredReadUtils.mergeSortedRanges(
|
||||||
Arrays.asList(sortRanges(input)), 100, 1001, 2500);
|
Arrays.asList(sortRanges(input)), 100, 1001, 2500);
|
||||||
Assertions.assertThat(outputList)
|
Assertions.assertThat(outputList)
|
||||||
.describedAs("merged range size")
|
.describedAs("merged range size")
|
||||||
|
@ -150,51 +157,105 @@ public class TestVectoredReadUtils extends HadoopTestBase {
|
||||||
Assertions.assertThat(output.getUnderlying())
|
Assertions.assertThat(output.getUnderlying())
|
||||||
.describedAs("merged range underlying size")
|
.describedAs("merged range underlying size")
|
||||||
.hasSize(3);
|
.hasSize(3);
|
||||||
assertEquals("range[1000,3100)", output.toString());
|
// range[1000,3100)
|
||||||
|
assertFileRange(output, 1000, 2100);
|
||||||
assertTrue("merged output ranges are disjoint",
|
assertTrue("merged output ranges are disjoint",
|
||||||
VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800));
|
VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800));
|
||||||
|
|
||||||
// the minSeek doesn't allow the first two to merge
|
// the minSeek doesn't allow the first two to merge
|
||||||
assertFalse("Ranges are non disjoint",
|
assertFalse("Ranges are non disjoint",
|
||||||
VectoredReadUtils.isOrderedDisjoint(input, 100, 1000));
|
VectoredReadUtils.isOrderedDisjoint(input, 100, 1000));
|
||||||
outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)),
|
final List<CombinedFileRange> list2 = VectoredReadUtils.mergeSortedRanges(
|
||||||
|
Arrays.asList(sortRanges(input)),
|
||||||
100, 1000, 2100);
|
100, 1000, 2100);
|
||||||
Assertions.assertThat(outputList)
|
Assertions.assertThat(list2)
|
||||||
.describedAs("merged range size")
|
.describedAs("merged range size")
|
||||||
.hasSize(2);
|
.hasSize(2);
|
||||||
assertEquals("range[1000,1100)", outputList.get(0).toString());
|
assertFileRange(list2.get(0), 1000, 100);
|
||||||
assertEquals("range[2100,3100)", outputList.get(1).toString());
|
|
||||||
|
// range[2100,3100)
|
||||||
|
assertFileRange(list2.get(1), 2100, 1000);
|
||||||
|
|
||||||
assertTrue("merged output ranges are disjoint",
|
assertTrue("merged output ranges are disjoint",
|
||||||
VectoredReadUtils.isOrderedDisjoint(outputList, 100, 1000));
|
VectoredReadUtils.isOrderedDisjoint(list2, 100, 1000));
|
||||||
|
|
||||||
// the maxSize doesn't allow the third range to merge
|
// the maxSize doesn't allow the third range to merge
|
||||||
assertFalse("Ranges are non disjoint",
|
assertFalse("Ranges are non disjoint",
|
||||||
VectoredReadUtils.isOrderedDisjoint(input, 100, 800));
|
VectoredReadUtils.isOrderedDisjoint(input, 100, 800));
|
||||||
outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)),
|
final List<CombinedFileRange> list3 = VectoredReadUtils.mergeSortedRanges(
|
||||||
|
Arrays.asList(sortRanges(input)),
|
||||||
100, 1001, 2099);
|
100, 1001, 2099);
|
||||||
Assertions.assertThat(outputList)
|
Assertions.assertThat(list3)
|
||||||
.describedAs("merged range size")
|
.describedAs("merged range size")
|
||||||
.hasSize(2);
|
.hasSize(2);
|
||||||
assertEquals("range[1000,2200)", outputList.get(0).toString());
|
// range[1000,2200)
|
||||||
assertEquals("range[3000,3100)", outputList.get(1).toString());
|
CombinedFileRange range0 = list3.get(0);
|
||||||
|
assertFileRange(range0, 1000, 1200);
|
||||||
|
assertFileRange(range0.getUnderlying().get(0),
|
||||||
|
1000, 100, "3");
|
||||||
|
assertFileRange(range0.getUnderlying().get(1),
|
||||||
|
2100, 100, null);
|
||||||
|
CombinedFileRange range1 = list3.get(1);
|
||||||
|
// range[3000,3100)
|
||||||
|
assertFileRange(range1, 3000, 100);
|
||||||
|
assertFileRange(range1.getUnderlying().get(0),
|
||||||
|
3000, 100, "1");
|
||||||
|
|
||||||
assertTrue("merged output ranges are disjoint",
|
assertTrue("merged output ranges are disjoint",
|
||||||
VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800));
|
VectoredReadUtils.isOrderedDisjoint(list3, 100, 800));
|
||||||
|
|
||||||
// test the round up and round down (the maxSize doesn't allow any merges)
|
// test the round up and round down (the maxSize doesn't allow any merges)
|
||||||
assertFalse("Ranges are non disjoint",
|
assertFalse("Ranges are non disjoint",
|
||||||
VectoredReadUtils.isOrderedDisjoint(input, 16, 700));
|
VectoredReadUtils.isOrderedDisjoint(input, 16, 700));
|
||||||
outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)),
|
final List<CombinedFileRange> list4 = VectoredReadUtils.mergeSortedRanges(
|
||||||
|
Arrays.asList(sortRanges(input)),
|
||||||
16, 1001, 100);
|
16, 1001, 100);
|
||||||
Assertions.assertThat(outputList)
|
Assertions.assertThat(list4)
|
||||||
.describedAs("merged range size")
|
.describedAs("merged range size")
|
||||||
.hasSize(3);
|
.hasSize(3);
|
||||||
assertEquals("range[992,1104)", outputList.get(0).toString());
|
// range[992,1104)
|
||||||
assertEquals("range[2096,2208)", outputList.get(1).toString());
|
assertFileRange(list4.get(0), 992, 112);
|
||||||
assertEquals("range[2992,3104)", outputList.get(2).toString());
|
// range[2096,2208)
|
||||||
|
assertFileRange(list4.get(1), 2096, 112);
|
||||||
|
// range[2992,3104)
|
||||||
|
assertFileRange(list4.get(2), 2992, 112);
|
||||||
assertTrue("merged output ranges are disjoint",
|
assertTrue("merged output ranges are disjoint",
|
||||||
VectoredReadUtils.isOrderedDisjoint(outputList, 16, 700));
|
VectoredReadUtils.isOrderedDisjoint(list4, 16, 700));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert that a file range satisfies the conditions.
|
||||||
|
* @param range range to validate
|
||||||
|
* @param offset offset of range
|
||||||
|
* @param length range length
|
||||||
|
*/
|
||||||
|
private void assertFileRange(FileRange range, long offset, int length) {
|
||||||
|
Assertions.assertThat(range)
|
||||||
|
.describedAs("file range %s", range)
|
||||||
|
.isNotNull();
|
||||||
|
Assertions.assertThat(range.getOffset())
|
||||||
|
.describedAs("offset of %s", range)
|
||||||
|
.isEqualTo(offset);
|
||||||
|
Assertions.assertThat(range.getLength())
|
||||||
|
.describedAs("length of %s", range)
|
||||||
|
.isEqualTo(length);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert that a file range satisfies the conditions.
|
||||||
|
* @param range range to validate
|
||||||
|
* @param offset offset of range
|
||||||
|
* @param length range length
|
||||||
|
* @param reference reference; may be null.
|
||||||
|
*/
|
||||||
|
private void assertFileRange(FileRange range, long offset, int length, Object reference) {
|
||||||
|
assertFileRange(range, offset, length);
|
||||||
|
Assertions.assertThat(range.getReference())
|
||||||
|
.describedAs("reference field of file range %s", range)
|
||||||
|
.isEqualTo(reference);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSortAndMergeMoreCases() throws Exception {
|
public void testSortAndMergeMoreCases() throws Exception {
|
||||||
List<FileRange> input = Arrays.asList(
|
List<FileRange> input = Arrays.asList(
|
||||||
|
@ -214,7 +275,9 @@ public class TestVectoredReadUtils extends HadoopTestBase {
|
||||||
Assertions.assertThat(output.getUnderlying())
|
Assertions.assertThat(output.getUnderlying())
|
||||||
.describedAs("merged range underlying size")
|
.describedAs("merged range underlying size")
|
||||||
.hasSize(4);
|
.hasSize(4);
|
||||||
assertEquals("range[1000,3110)", output.toString());
|
|
||||||
|
assertFileRange(output, 1000, 2110);
|
||||||
|
|
||||||
assertTrue("merged output ranges are disjoint",
|
assertTrue("merged output ranges are disjoint",
|
||||||
VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800));
|
VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800));
|
||||||
|
|
||||||
|
@ -227,7 +290,8 @@ public class TestVectoredReadUtils extends HadoopTestBase {
|
||||||
Assertions.assertThat(output.getUnderlying())
|
Assertions.assertThat(output.getUnderlying())
|
||||||
.describedAs("merged range underlying size")
|
.describedAs("merged range underlying size")
|
||||||
.hasSize(4);
|
.hasSize(4);
|
||||||
assertEquals("range[1000,3200)", output.toString());
|
assertFileRange(output, 1000, 2200);
|
||||||
|
|
||||||
assertTrue("merged output ranges are disjoint",
|
assertTrue("merged output ranges are disjoint",
|
||||||
VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800));
|
VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800));
|
||||||
|
|
||||||
|
|
|
@ -169,7 +169,7 @@ public class VectoredReadBenchmark {
|
||||||
|
|
||||||
FileRangeCallback(AsynchronousFileChannel channel, long offset,
|
FileRangeCallback(AsynchronousFileChannel channel, long offset,
|
||||||
int length, Joiner joiner, ByteBuffer buffer) {
|
int length, Joiner joiner, ByteBuffer buffer) {
|
||||||
super(offset, length);
|
super(offset, length, null);
|
||||||
this.channel = channel;
|
this.channel = channel;
|
||||||
this.joiner = joiner;
|
this.joiner = joiner;
|
||||||
this.buffer = buffer;
|
this.buffer = buffer;
|
||||||
|
|
Loading…
Reference in New Issue