HADOOP-18107 Adding scale test for vectored reads for large file (#4273)

part of HADOOP-18103.

Contributed By: Mukund Thakur
This commit is contained in:
Mukund Thakur 2022-06-02 03:35:54 +05:30 committed by Steve Loughran
parent 5db0f34e29
commit 1408dd89a7
6 changed files with 111 additions and 77 deletions

View File

@ -43,7 +43,9 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.impl.FutureIOSupport;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDatasetEquals;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
@RunWith(Parameterized.class)
public abstract class AbstractContractVectoredReadTest extends AbstractFSContractTestBase {
@ -53,8 +55,6 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
public static final int DATASET_LEN = 64 * 1024;
private static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
protected static final String VECTORED_READ_FILE_NAME = "vectored_file.txt";
private static final String VECTORED_READ_FILE_1MB_NAME = "vectored_file_1M.txt";
private static final byte[] DATASET_MB = ContractTestUtils.dataset(1024 * 1024, 'a', 256);
private final IntFunction<ByteBuffer> allocate;
@ -77,8 +77,6 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
Path path = path(VECTORED_READ_FILE_NAME);
FileSystem fs = getFileSystem();
createFile(fs, path, true, DATASET);
Path bigFile = path(VECTORED_READ_FILE_1MB_NAME);
createFile(fs, bigFile, true, DATASET_MB);
}
@Test
@ -99,7 +97,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(completableFutures);
combinedFuture.get();
validateVectoredReadResult(fileRanges);
validateVectoredReadResult(fileRanges, DATASET);
}
}
@ -132,7 +130,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
fileRanges.add(new FileRangeImpl(16 * 1024 + 101, 100));
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges);
validateVectoredReadResult(fileRanges, DATASET);
}
}
@ -149,7 +147,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
fileRanges.add(new FileRangeImpl(8*1024 - 101, 100));
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges);
validateVectoredReadResult(fileRanges, DATASET);
}
}
@ -168,7 +166,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
fileRanges.add(new FileRangeImpl(40*1024, 1024));
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges);
validateVectoredReadResult(fileRanges, DATASET);
}
}
@ -184,24 +182,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
.build();
try (FSDataInputStream in = builder.get()) {
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges);
}
}
@Test
public void testVectoredRead1MBFile() throws Exception {
FileSystem fs = getFileSystem();
List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(new FileRangeImpl(1293, 25837));
CompletableFuture<FSDataInputStream> builder =
fs.openFile(path(VECTORED_READ_FILE_1MB_NAME))
.build();
try (FSDataInputStream in = builder.get()) {
in.readVectored(fileRanges, allocate);
ByteBuffer vecRes = FutureIOSupport.awaitFuture(fileRanges.get(0).getData());
FileRange resRange = fileRanges.get(0);
assertDatasetEquals((int) resRange.getOffset(), "vecRead",
vecRes, resRange.getLength(), DATASET_MB);
validateVectoredReadResult(fileRanges, DATASET);
}
}
@ -215,7 +196,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
fileRanges.add(new FileRangeImpl(10, 980));
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges);
validateVectoredReadResult(fileRanges, DATASET);
}
}
@ -272,7 +253,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
Assertions.assertThat(in.getPos())
.describedAs("Vectored read shouldn't change file pointer.")
.isEqualTo(200);
validateVectoredReadResult(fileRanges);
validateVectoredReadResult(fileRanges, DATASET);
}
}
@ -290,7 +271,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
.describedAs("Vectored read shouldn't change file pointer.")
.isEqualTo(200);
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges);
validateVectoredReadResult(fileRanges, DATASET);
}
}
@ -302,8 +283,8 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
in.readVectored(fileRanges1, allocate);
in.readVectored(fileRanges2, allocate);
validateVectoredReadResult(fileRanges2);
validateVectoredReadResult(fileRanges1);
validateVectoredReadResult(fileRanges2, DATASET);
validateVectoredReadResult(fileRanges1, DATASET);
}
}
@ -314,27 +295,6 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
return fileRanges;
}
protected void validateVectoredReadResult(List<FileRange> fileRanges)
throws ExecutionException, InterruptedException {
CompletableFuture<?>[] completableFutures = new CompletableFuture<?>[fileRanges.size()];
int i = 0;
for (FileRange res : fileRanges) {
completableFutures[i++] = res.getData();
}
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(completableFutures);
combinedFuture.get();
for (FileRange res : fileRanges) {
CompletableFuture<ByteBuffer> data = res.getData();
try {
ByteBuffer buffer = FutureIOSupport.awaitFuture(data);
assertDatasetEquals((int) res.getOffset(), "vecRead", buffer, res.getLength(), DATASET);
} catch (Exception ex) {
LOG.error("Exception while running vectored read ", ex);
Assert.fail("Exception while running vectored read " + ex);
}
}
}
protected void testExceptionalVectoredRead(FileSystem fs,
List<FileRange> fileRanges,
@ -351,26 +311,4 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
.describedAs(s)
.isTrue();
}
/**
* Assert that the data read matches the dataset at the given offset.
* This helps verify that the seek process is moving the read pointer
* to the correct location in the file.
* @param readOffset the offset in the file where the read began.
* @param operation operation name for the assertion.
* @param data data read in.
* @param length length of data to check.
* @param originalData
*/
private void assertDatasetEquals(
final int readOffset, final String operation,
final ByteBuffer data,
int length, byte[] originalData) {
for (int i = 0; i < length; i++) {
int o = readOffset + i;
assertEquals(operation + " with read offset " + readOffset
+ ": data[" + i + "] != DATASET[" + o + "]",
originalData[o], data.get());
}
}
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.fs.contract;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
@ -30,6 +31,7 @@ import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.functional.RemoteIterators;
import org.apache.hadoop.util.functional.FutureIO;
import org.junit.Assert;
import org.junit.AssumptionViolatedException;
@ -41,6 +43,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -51,6 +54,9 @@ import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
@ -70,6 +76,11 @@ public class ContractTestUtils extends Assert {
public static final String IO_CHUNK_MODULUS_SIZE = "io.chunk.modulus.size";
public static final int DEFAULT_IO_CHUNK_MODULUS_SIZE = 128;
/**
* Timeout in seconds for vectored read operation in tests : {@value}.
*/
public static final int VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS = 5 * 60;
/**
* Assert that a property in the property set matches the expected value.
* @param props property set
@ -1097,6 +1108,59 @@ public class ContractTestUtils extends Assert {
mismatch);
}
/**
* Utility to validate vectored read results.
* @param fileRanges input ranges.
* @param originalData original data.
* @throws IOException any ioe.
*/
public static void validateVectoredReadResult(List<FileRange> fileRanges,
byte[] originalData)
throws IOException, TimeoutException {
CompletableFuture<?>[] completableFutures = new CompletableFuture<?>[fileRanges.size()];
int i = 0;
for (FileRange res : fileRanges) {
completableFutures[i++] = res.getData();
}
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(completableFutures);
FutureIO.awaitFuture(combinedFuture,
VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
TimeUnit.SECONDS);
for (FileRange res : fileRanges) {
CompletableFuture<ByteBuffer> data = res.getData();
ByteBuffer buffer = FutureIO.awaitFuture(data,
VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
TimeUnit.SECONDS);
assertDatasetEquals((int) res.getOffset(), "vecRead",
buffer, res.getLength(), originalData);
}
}
/**
* Assert that the data read matches the dataset at the given offset.
* This helps verify that the seek process is moving the read pointer
* to the correct location in the file.
* @param readOffset the offset in the file where the read began.
* @param operation operation name for the assertion.
* @param data data read in.
* @param length length of data to check.
* @param originalData original data.
*/
public static void assertDatasetEquals(
final int readOffset,
final String operation,
final ByteBuffer data,
int length, byte[] originalData) {
for (int i = 0; i < length; i++) {
int o = readOffset + i;
assertEquals(operation + " with read offset " + readOffset
+ ": data[" + i + "] != DATASET[" + o + "]",
originalData[o], data.get());
}
}
/**
* Receives test data from the given input file and checks the size of the
* data as well as the pattern inside the received data.

View File

@ -186,6 +186,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
* @param ctx operation context
* @param s3Attributes object attributes
* @param client S3 client to use
* @param streamStatistics stream io stats.
* @param unboundedThreadPool thread pool to use.
*/
public S3AInputStream(S3AReadOpContext ctx,

View File

@ -153,7 +153,6 @@ public class S3AReadOpContext extends S3AOpContext {
}
/**
<<<<<<< HEAD
* Set builder value.
* @param value new value
* @return the builder

View File

@ -220,8 +220,7 @@ public class GetContentSummaryOperation extends
/***
* List all entries under a path.
*
* @param path
* @param path path.
* @param recursive if the subdirectories need to be traversed recursively
* @return an iterator over the listing.
* @throws IOException failure

View File

@ -19,8 +19,13 @@
package org.apache.hadoop.fs.s3a.scale;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.IntFunction;
import com.amazonaws.event.ProgressEvent;
import com.amazonaws.event.ProgressEventType;
@ -35,7 +40,10 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileRangeImpl;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.Constants;
@ -47,6 +55,7 @@ import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.util.Progressable;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING;
@ -446,6 +455,30 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
toHuman(timer.nanosPerOperation(ops)));
}
@Test
public void test_045_vectoredIOHugeFile() throws Throwable {
assumeHugeFileExists();
List<FileRange> rangeList = new ArrayList<>();
rangeList.add(new FileRangeImpl(5856368, 1167716));
rangeList.add(new FileRangeImpl(3520861, 1167700));
rangeList.add(new FileRangeImpl(8191913, 1167775));
rangeList.add(new FileRangeImpl(1520861, 1167700));
rangeList.add(new FileRangeImpl(2520861, 116770));
rangeList.add(new FileRangeImpl(9191913, 116770));
rangeList.add(new FileRangeImpl(2820861, 156770));
IntFunction<ByteBuffer> allocate = ByteBuffer::allocate;
FileSystem fs = getFileSystem();
CompletableFuture<FSDataInputStream> builder =
fs.openFile(hugefile).build();
try (FSDataInputStream in = builder.get()) {
in.readVectored(rangeList, allocate);
byte[] readFullRes = new byte[(int)filesize];
in.readFully(0, readFullRes);
// Comparing vectored read results with read fully.
validateVectoredReadResult(rangeList, readFullRes);
}
}
/**
* Read in the entire file using read() calls.
* @throws Throwable failure