diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md index 0fe1772d266..e4a2830967e 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md @@ -474,6 +474,7 @@ end of first and start of next range is more than this value. Maximum number of bytes which can be read in one go after merging the ranges. Two ranges won't be merged if the combined data to be read is more than this value. +Essentially setting this to 0 will disable the merging of ranges. ## Consistency diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java index eee4b11e739..756c3de85cc 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java @@ -18,15 +18,6 @@ package org.apache.hadoop.fs.contract; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileRange; -import org.apache.hadoop.fs.FileRangeImpl; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.impl.FutureIOSupport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; @@ -42,6 +33,15 @@ import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.FileRangeImpl; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.impl.FutureIOSupport; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; @@ -52,7 +52,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac public static final int DATASET_LEN = 64 * 1024; private static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32); - private static final String VECTORED_READ_FILE_NAME = "vectored_file.txt"; + protected static final String VECTORED_READ_FILE_NAME = "vectored_file.txt"; private static final String VECTORED_READ_FILE_1MB_NAME = "vectored_file_1M.txt"; private static final byte[] DATASET_MB = ContractTestUtils.dataset(1024 * 1024, 'a', 256); @@ -172,6 +172,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac } } + @Test public void testSameRanges() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java index f789f361905..cfd366701be 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java @@ -207,6 +207,30 @@ public class TestVectoredReadUtils extends HadoopTestBase { VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800)); } + + @Test + public void testMaxSizeZeroDisablesMering() throws Exception { + List randomRanges = Arrays.asList( + new FileRangeImpl(3000, 110), + new FileRangeImpl(3000, 100), + new FileRangeImpl(2100, 100) + ); + assertEqualRangeCountsAfterMerging(randomRanges, 1, 1, 0); + assertEqualRangeCountsAfterMerging(randomRanges, 1, 0, 0); + assertEqualRangeCountsAfterMerging(randomRanges, 1, 100, 0); + } + + private void assertEqualRangeCountsAfterMerging(List inputRanges, + int chunkSize, + int minimumSeek, + int maxSize) { + List combinedFileRanges = VectoredReadUtils + .sortAndMergeRanges(inputRanges, chunkSize, minimumSeek, maxSize); + Assertions.assertThat(combinedFileRanges) + .describedAs("Mismatch in number of ranges post merging") + .hasSize(inputRanges.size()); + } + interface Stream extends PositionedReadable, ByteBufferPositionedReadable { // nothing } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MoreAsserts.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MoreAsserts.java index f83ef9e63d1..f6e6055d78e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MoreAsserts.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MoreAsserts.java @@ -86,4 +86,16 @@ public class MoreAsserts { "completed exceptionally") .isTrue(); } + + /** + * Assert two same type of values. + * @param actual actual value. + * @param expected expected value. + * @param message error message to print in case of mismatch. + */ + public static void assertEqual(T actual, T expected, String message) { + Assertions.assertThat(actual) + .describedAs("Mismatch in %s", message) + .isEqualTo(expected); + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 48d17aa402d..764a6adaca2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1177,4 +1177,30 @@ public final class Constants { */ public static final String FS_S3A_CREATE_HEADER = "fs.s3a.create.header"; + /** + * What is the smallest reasonable seek in bytes such + * that we group ranges together during vectored read operation. + * Value : {@value}. + */ + public static final String AWS_S3_VECTOR_READS_MIN_SEEK_SIZE = + "fs.s3a.vectored.read.min.seek.size"; + + /** + * What is the largest merged read size in bytes such + * that we group ranges together during vectored read. + * Setting this value to 0 will disable merging of ranges. + * Value : {@value}. + */ + public static final String AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE = + "fs.s3a.vectored.read.max.merged.size"; + + /** + * Default minimum seek in bytes during vectored reads : {@value}. + */ + public static final int DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE = 4896; // 4K + + /** + * Default maximum read size in bytes during vectored reads : {@value}. + */ + public static final int DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE = 1253376; //1M } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 872f1ae818b..40671e0d334 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -313,6 +313,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, * {@code openFile()}. */ private S3AInputPolicy inputPolicy; + /** Vectored IO context. */ + private VectoredIOContext vectoredIOContext; + + private long readAhead; private ChangeDetectionPolicy changeDetectionPolicy; private final AtomicBoolean closed = new AtomicBoolean(false); private volatile boolean isClosed = false; @@ -584,6 +588,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, longBytesOption(conf, ASYNC_DRAIN_THRESHOLD, DEFAULT_ASYNC_DRAIN_THRESHOLD, 0), inputPolicy); + vectoredIOContext = populateVectoredIOContext(conf); } catch (AmazonClientException e) { // amazon client exception: stop all services then throw the translation cleanupWithLogger(LOG, span); @@ -597,6 +602,23 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, } } + /** + * Populates the configurations related to vectored IO operation + * in the context which has to passed down to input streams. + * @param conf configuration object. + * @return VectoredIOContext. + */ + private VectoredIOContext populateVectoredIOContext(Configuration conf) { + final int minSeekVectored = (int) longBytesOption(conf, AWS_S3_VECTOR_READS_MIN_SEEK_SIZE, + DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE, 0); + final int maxReadSizeVectored = (int) longBytesOption(conf, AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE, + DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE, 0); + return new VectoredIOContext() + .setMinSeekForVectoredReads(minSeekVectored) + .setMaxReadSizeForVectoredReads(maxReadSizeVectored) + .build(); + } + /** * Set the client side encryption gauge to 0 or 1, indicating if CSE is * enabled through the gauge or not. @@ -1552,7 +1574,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, invoker, statistics, statisticsContext, - fileStatus) + fileStatus, + vectoredIOContext) .withAuditSpan(auditSpan); openFileHelper.applyDefaultOptions(roc); return roc.build(); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 05d9c7f9fe9..23f31df1645 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -145,6 +145,9 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, private S3AInputPolicy inputPolicy; private long readahead = Constants.DEFAULT_READAHEAD_RANGE; + /** Vectored IO context. */ + private final VectoredIOContext vectoredIOContext; + /** * This is the actual position within the object, used by * lazy seek to decide whether to seek on the next read or not. @@ -212,6 +215,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, setReadahead(ctx.getReadahead()); this.asyncDrainThreshold = ctx.getAsyncDrainThreshold(); this.unboundedThreadPool = unboundedThreadPool; + this.vectoredIOContext = context.getVectoredIOContext(); } /** @@ -859,6 +863,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, sb.append(" remainingInCurrentRequest=") .append(remainingInCurrentRequest()); sb.append(" ").append(changeTracker); + sb.append(" ").append(vectoredIOContext); sb.append('\n').append(s); sb.append('}'); return sb.toString(); @@ -905,6 +910,22 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, } } + /** + * {@inheritDoc}. + */ + @Override + public int minSeekForVectorReads() { + return vectoredIOContext.getMinSeekForVectorReads(); + } + + /** + * {@inheritDoc}. + */ + @Override + public int maxReadSizeForVectorReads() { + return vectoredIOContext.getMaxReadSizeForVectorReads(); + } + /** * {@inheritDoc} * Vectored read implementation for S3AInputStream. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java index f416cf9485d..29e3df1af12 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java @@ -64,6 +64,12 @@ public class S3AReadOpContext extends S3AOpContext { */ private long asyncDrainThreshold; + /** + * Vectored IO context for vectored read api + * in {@code S3AInputStream#readVectored(List, IntFunction)}. + */ + private final VectoredIOContext vectoredIOContext; + /** * Instantiate. * @param path path of read @@ -71,17 +77,19 @@ public class S3AReadOpContext extends S3AOpContext { * @param stats Fileystem statistics (may be null) * @param instrumentation statistics context * @param dstFileStatus target file status + * @param vectoredIOContext context for vectored read operation. */ public S3AReadOpContext( final Path path, Invoker invoker, @Nullable FileSystem.Statistics stats, S3AStatisticsContext instrumentation, - FileStatus dstFileStatus) { - + FileStatus dstFileStatus, + VectoredIOContext vectoredIOContext) { super(invoker, stats, instrumentation, dstFileStatus); this.path = requireNonNull(path); + this.vectoredIOContext = requireNonNull(vectoredIOContext, "vectoredIOContext"); } /** @@ -145,6 +153,7 @@ public class S3AReadOpContext extends S3AOpContext { } /** +<<<<<<< HEAD * Set builder value. * @param value new value * @return the builder @@ -199,6 +208,14 @@ public class S3AReadOpContext extends S3AOpContext { return asyncDrainThreshold; } + /** + * Get Vectored IO context for this this read op. + * @return vectored IO context. + */ + public VectoredIOContext getVectoredIOContext() { + return vectoredIOContext; + } + @Override public String toString() { final StringBuilder sb = new StringBuilder( diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/VectoredIOContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/VectoredIOContext.java new file mode 100644 index 00000000000..31f0ae4cb55 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/VectoredIOContext.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.util.List; +import java.util.function.IntFunction; + +/** + * Context related to vectored IO operation. + * See {@link S3AInputStream#readVectored(List, IntFunction)}. + */ +public class VectoredIOContext { + + /** + * What is the smallest reasonable seek that we should group + * ranges together during vectored read operation. + */ + private int minSeekForVectorReads; + + /** + * What is the largest size that we should group ranges + * together during vectored read operation. + * Setting this value 0 will disable merging of ranges. + */ + private int maxReadSizeForVectorReads; + + /** + * Default no arg constructor. + */ + public VectoredIOContext() { + } + + public VectoredIOContext setMinSeekForVectoredReads(int minSeek) { + this.minSeekForVectorReads = minSeek; + return this; + } + + public VectoredIOContext setMaxReadSizeForVectoredReads(int maxSize) { + this.maxReadSizeForVectorReads = maxSize; + return this; + } + + public VectoredIOContext build() { + return this; + } + + public int getMinSeekForVectorReads() { + return minSeekForVectorReads; + } + + public int getMaxReadSizeForVectorReads() { + return maxReadSizeForVectorReads; + } + + @Override + public String toString() { + return "VectoredIOContext{" + + "minSeekForVectorReads=" + minSeekForVectorReads + + ", maxReadSizeForVectorReads=" + maxReadSizeForVectorReads + + '}'; + } +} diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md index f398c4cbcbe..06eb137cd9b 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md @@ -55,6 +55,36 @@ it isn't, and some attempts to preserve the metaphor are "aggressively suboptima To make most efficient use of S3, care is needed. +## Improving read performance using Vectored IO +The S3A FileSystem supports implementation of vectored read api using which +a client can provide a list of file ranges to read returning a future read +object associated with each range. For full api specification please see +[FSDataInputStream](../../hadoop-common-project/hadoop-common/filesystem/fsdatainputstream.html). + +The following properties can be configured to optimise vectored reads based +on the client requirements. + +```xml + + fs.s3a.vectored.read.min.seek.size + 4K + + What is the smallest reasonable seek in bytes such + that we group ranges together during vectored + read operation. + + + +fs.s3a.vectored.read.max.merged.size +1M + + What is the largest merged read size in bytes such + that we group ranges together during vectored read. + Setting this value to 0 will disable merging of ranges. + + +``` + ## Improving data input performance through fadvise The S3A Filesystem client supports the notion of input policies, similar diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java index 255cc6501c2..0752e75d247 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java @@ -18,15 +18,23 @@ package org.apache.hadoop.fs.contract.s3a; +import java.util.ArrayList; +import java.util.List; + +import org.junit.Test; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileRange; import org.apache.hadoop.fs.FileRangeImpl; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.s3a.Constants; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3ATestUtils; -import java.util.ArrayList; -import java.util.List; +import static org.apache.hadoop.test.MoreAsserts.assertEqual; public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTest { @@ -42,7 +50,6 @@ public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTe /** * Overriding in S3 vectored read api fails fast in case of EOF * requested range. - * @throws Exception */ @Override public void testEOFRanges() throws Exception { @@ -51,4 +58,45 @@ public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTe fileRanges.add(new FileRangeImpl(DATASET_LEN, 100)); testExceptionalVectoredRead(fs, fileRanges, "EOFException is expected"); } + + @Test + public void testMinSeekAndMaxSizeConfigsPropagation() throws Exception { + Configuration conf = getFileSystem().getConf(); + S3ATestUtils.removeBaseAndBucketOverrides(conf, + Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE, + Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE); + S3ATestUtils.disableFilesystemCaching(conf); + final int configuredMinSeek = 2 * 1024; + final int configuredMaxSize = 10 * 1024 * 1024; + conf.set(Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE, "2K"); + conf.set(Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE, "10M"); + try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) { + try (FSDataInputStream fis = fs.open(path(VECTORED_READ_FILE_NAME))) { + int newMinSeek = fis.minSeekForVectorReads(); + int newMaxSize = fis.maxReadSizeForVectorReads(); + assertEqual(newMinSeek, configuredMinSeek, + "configured s3a min seek for vectored reads"); + assertEqual(newMaxSize, configuredMaxSize, + "configured s3a max size for vectored reads"); + } + } + } + + @Test + public void testMinSeekAndMaxSizeDefaultValues() throws Exception { + Configuration conf = getFileSystem().getConf(); + S3ATestUtils.removeBaseAndBucketOverrides(conf, + Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE, + Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE); + try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) { + try (FSDataInputStream fis = fs.open(path(VECTORED_READ_FILE_NAME))) { + int minSeek = fis.minSeekForVectorReads(); + int maxSize = fis.maxReadSizeForVectorReads(); + assertEqual(minSeek, Constants.DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE, + "default s3a min seek for vectored reads"); + assertEqual(maxSize, Constants.DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE, + "default s3a max read size for vectored reads"); + } + } + } }