HADOOP-18391. Improvements in VectoredReadUtils#readVectored() for direct buffers (#4787)

part of HADOOP-18103.

Contributed By: Mukund Thakur
This commit is contained in:
Mukund Thakur 2022-08-31 21:41:41 +05:30
parent 0a11ce2546
commit 6cc5c92a89
4 changed files with 104 additions and 24 deletions

View File

@ -30,6 +30,7 @@ import java.util.function.IntFunction;
import org.apache.hadoop.fs.impl.CombinedFileRange;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.functional.Function4RaisingIOE;
/**
* Utility class which implements helper methods used
@ -37,6 +38,8 @@ import org.apache.hadoop.util.Preconditions;
*/
public final class VectoredReadUtils {
private static final int TMP_BUFFER_MAX_SIZE = 64 * 1024;
/**
* Validate a single range.
* @param range file range.
@ -114,7 +117,12 @@ public final class VectoredReadUtils {
FileRange range,
ByteBuffer buffer) throws IOException {
if (buffer.isDirect()) {
buffer.put(readInDirectBuffer(stream, range));
readInDirectBuffer(range.getLength(),
buffer,
(position, buffer1, offset, length) -> {
stream.readFully(position, buffer1, offset, length);
return null;
});
buffer.flip();
} else {
stream.readFully(range.getOffset(), buffer.array(),
@ -122,13 +130,34 @@ public final class VectoredReadUtils {
}
}
private static byte[] readInDirectBuffer(PositionedReadable stream,
FileRange range) throws IOException {
// if we need to read data from a direct buffer and the stream doesn't
// support it, we allocate a byte array to use.
byte[] tmp = new byte[range.getLength()];
stream.readFully(range.getOffset(), tmp, 0, tmp.length);
return tmp;
/**
* Read bytes from stream into a byte buffer using an
* intermediate byte array.
* @param length number of bytes to read.
* @param buffer buffer to fill.
* @param operation operation to use for reading data.
* @throws IOException any IOE.
*/
public static void readInDirectBuffer(int length,
ByteBuffer buffer,
Function4RaisingIOE<Integer, byte[], Integer,
Integer, Void> operation) throws IOException {
if (length == 0) {
return;
}
int readBytes = 0;
int position = 0;
int tmpBufferMaxSize = Math.min(TMP_BUFFER_MAX_SIZE, length);
byte[] tmp = new byte[tmpBufferMaxSize];
while (readBytes < length) {
int currentLength = (readBytes + tmpBufferMaxSize) < length ?
tmpBufferMaxSize
: (length - readBytes);
operation.apply(position, tmp, 0, currentLength);
buffer.put(tmp, 0, currentLength);
position = position + currentLength;
readBytes = readBytes + currentLength;
}
}
/**

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util.functional;
import java.io.IOException;
/**
* Function of arity 4 which may raise an IOException.
* @param <I1> type of arg1.
* @param <I2> type of arg2.
* @param <I3> type of arg3.
* @param <I4> type of arg4.
* @param <R> return type.
*/
public interface Function4RaisingIOE<I1, I2, I3, I4, R> {
/**
* Apply the function.
* @param i1 argument 1.
* @param i2 argument 2.
* @param i3 argument 3.
* @param i4 argument 4.
* @return return value.
* @throws IOException any IOE.
*/
R apply(I1 i1, I2 i2, I3 i3, I4 i4) throws IOException;
}

View File

@ -386,17 +386,31 @@ public class TestVectoredReadUtils extends HadoopTestBase {
List<FileRange> input = Arrays.asList(FileRange.createFileRange(0, 100),
FileRange.createFileRange(100_000, 100),
FileRange.createFileRange(200_000, 100));
runAndValidateVectoredRead(input);
}
@Test
public void testReadVectoredZeroBytes() throws Exception {
List<FileRange> input = Arrays.asList(FileRange.createFileRange(0, 0),
FileRange.createFileRange(100_000, 100),
FileRange.createFileRange(200_000, 0));
runAndValidateVectoredRead(input);
}
private void runAndValidateVectoredRead(List<FileRange> input)
throws Exception {
Stream stream = Mockito.mock(Stream.class);
Mockito.doAnswer(invocation -> {
fillBuffer(invocation.getArgument(1));
return null;
}).when(stream).readFully(ArgumentMatchers.anyLong(),
ArgumentMatchers.any(ByteBuffer.class));
ArgumentMatchers.any(ByteBuffer.class));
// should not merge the ranges
VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate);
Mockito.verify(stream, Mockito.times(3))
.readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class));
for(int b=0; b < input.size(); ++b) {
.readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class));
for (int b = 0; b < input.size(); ++b) {
validateBuffer("buffer " + b, input.get(b).getData().get(), 0);
}
}

View File

@ -1054,20 +1054,13 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
private void populateBuffer(int length,
ByteBuffer buffer,
S3ObjectInputStream objectContent) throws IOException {
if (buffer.isDirect()) {
int readBytes = 0;
int offset = 0;
byte[] tmp = new byte[TMP_BUFFER_MAX_SIZE];
while (readBytes < length) {
checkIfVectoredIOStopped();
int currentLength = readBytes + TMP_BUFFER_MAX_SIZE < length ?
TMP_BUFFER_MAX_SIZE
: length - readBytes;
readByteArray(objectContent, tmp, 0, currentLength);
buffer.put(tmp, 0, currentLength);
offset = offset + currentLength;
readBytes = readBytes + currentLength;
}
VectoredReadUtils.readInDirectBuffer(length, buffer,
(position, tmp, offset, currentLength) -> {
readByteArray(objectContent, tmp, offset, currentLength);
return null;
});
buffer.flip();
} else {
readByteArray(objectContent, buffer.array(), 0, length);
@ -1076,6 +1069,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
incrementBytesRead(length);
}
/**
* Read data into destination buffer from s3 object content.
* @param objectContent result from S3.