HADOOP-18463. Add an integration test to process data asynchronously during vectored read. (#4921)
part of HADOOP-18103. Contributed by: Mukund Thakur
This commit is contained in:
parent
bbe841e601
commit
0d772b353f
|
@ -19,12 +19,17 @@
|
|||
package org.apache.hadoop.fs.contract;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.function.IntFunction;
|
||||
|
||||
import org.assertj.core.api.Assertions;
|
||||
|
@ -42,7 +47,10 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.impl.FutureIOSupport;
|
||||
import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool;
|
||||
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
||||
import org.apache.hadoop.util.functional.FutureIO;
|
||||
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDatasetEquals;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
|
||||
|
@ -364,6 +372,66 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test creates list of ranges and then submit a readVectored
|
||||
* operation and then uses a separate thread pool to process the
|
||||
* results asynchronously.
|
||||
*/
|
||||
@Test
|
||||
public void testVectoredIOEndToEnd() throws Exception {
|
||||
FileSystem fs = getFileSystem();
|
||||
List<FileRange> fileRanges = new ArrayList<>();
|
||||
fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
|
||||
fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
|
||||
fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
|
||||
fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
|
||||
fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));
|
||||
|
||||
ExecutorService dataProcessor = Executors.newFixedThreadPool(5);
|
||||
CountDownLatch countDown = new CountDownLatch(fileRanges.size());
|
||||
|
||||
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
|
||||
in.readVectored(fileRanges, value -> pool.getBuffer(true, value));
|
||||
for (FileRange res : fileRanges) {
|
||||
dataProcessor.submit(() -> {
|
||||
try {
|
||||
readBufferValidateDataAndReturnToPool(res, countDown);
|
||||
} catch (Exception e) {
|
||||
String error = String.format("Error while processing result for %s", res);
|
||||
LOG.error(error, e);
|
||||
ContractTestUtils.fail(error, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
// user can perform other computations while waiting for IO.
|
||||
if (!countDown.await(VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
|
||||
ContractTestUtils.fail("Timeout/Error while processing vectored io results");
|
||||
}
|
||||
} finally {
|
||||
HadoopExecutors.shutdown(dataProcessor, LOG,
|
||||
VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
private void readBufferValidateDataAndReturnToPool(FileRange res,
|
||||
CountDownLatch countDownLatch)
|
||||
throws IOException, TimeoutException {
|
||||
CompletableFuture<ByteBuffer> data = res.getData();
|
||||
// Read the data and perform custom operation. Here we are just
|
||||
// validating it with original data.
|
||||
FutureIO.awaitFuture(data.thenAccept(buffer -> {
|
||||
assertDatasetEquals((int) res.getOffset(),
|
||||
"vecRead", buffer, res.getLength(), DATASET);
|
||||
// return buffer to the pool once read.
|
||||
pool.putBuffer(buffer);
|
||||
}),
|
||||
VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS);
|
||||
|
||||
// countdown to notify main thread that processing has been done.
|
||||
countDownLatch.countDown();
|
||||
}
|
||||
|
||||
|
||||
protected List<FileRange> createSampleNonOverlappingRanges() {
|
||||
List<FileRange> fileRanges = new ArrayList<>();
|
||||
fileRanges.add(FileRange.createFileRange(0, 100));
|
||||
|
|
Loading…
Reference in New Issue