HADOOP-18104: S3A: Add configs to configure minSeekForVectorReads and maxReadSizeForVectorReads (#3964)

Part of HADOOP-18103.
Introducing fs.s3a.vectored.read.min.seek.size and fs.s3a.vectored.read.max.merged.size
to configure min seek and max read during a vectored IO operation in S3A connector.
These properties actually define how the ranges will be merged. To completely
disable merging set fs.s3a.max.readsize.vectored.read to 0.

Contributed By: Mukund Thakur
This commit is contained in:
Mukund Thakur 2022-04-30 04:17:33 +05:30
parent f3f71434b5
commit 307bd14e1e
11 changed files with 297 additions and 16 deletions

View File

@ -474,6 +474,7 @@ end of first and start of next range is more than this value.
Maximum number of bytes which can be read in one go after merging the ranges.
Two ranges won't be merged if the combined data to be read is more than this value.
Essentially setting this to 0 will disable the merging of ranges.
## Consistency

View File

@ -18,15 +18,6 @@
package org.apache.hadoop.fs.contract;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileRangeImpl;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.impl.FutureIOSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -42,6 +33,15 @@ import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileRangeImpl;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.impl.FutureIOSupport;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
@ -52,7 +52,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
public static final int DATASET_LEN = 64 * 1024;
private static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
private static final String VECTORED_READ_FILE_NAME = "vectored_file.txt";
protected static final String VECTORED_READ_FILE_NAME = "vectored_file.txt";
private static final String VECTORED_READ_FILE_1MB_NAME = "vectored_file_1M.txt";
private static final byte[] DATASET_MB = ContractTestUtils.dataset(1024 * 1024, 'a', 256);
@ -172,6 +172,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
}
}
@Test
public void testSameRanges() throws Exception {
FileSystem fs = getFileSystem();
List<FileRange> fileRanges = new ArrayList<>();

View File

@ -207,6 +207,30 @@ public class TestVectoredReadUtils extends HadoopTestBase {
VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800));
}
@Test
public void testMaxSizeZeroDisablesMering() throws Exception {
List<FileRange> randomRanges = Arrays.asList(
new FileRangeImpl(3000, 110),
new FileRangeImpl(3000, 100),
new FileRangeImpl(2100, 100)
);
assertEqualRangeCountsAfterMerging(randomRanges, 1, 1, 0);
assertEqualRangeCountsAfterMerging(randomRanges, 1, 0, 0);
assertEqualRangeCountsAfterMerging(randomRanges, 1, 100, 0);
}
private void assertEqualRangeCountsAfterMerging(List<FileRange> inputRanges,
int chunkSize,
int minimumSeek,
int maxSize) {
List<CombinedFileRange> combinedFileRanges = VectoredReadUtils
.sortAndMergeRanges(inputRanges, chunkSize, minimumSeek, maxSize);
Assertions.assertThat(combinedFileRanges)
.describedAs("Mismatch in number of ranges post merging")
.hasSize(inputRanges.size());
}
interface Stream extends PositionedReadable, ByteBufferPositionedReadable {
// nothing
}

View File

@ -86,4 +86,16 @@ public class MoreAsserts {
"completed exceptionally")
.isTrue();
}
/**
* Assert two same type of values.
* @param actual actual value.
* @param expected expected value.
* @param message error message to print in case of mismatch.
*/
public static <T> void assertEqual(T actual, T expected, String message) {
Assertions.assertThat(actual)
.describedAs("Mismatch in %s", message)
.isEqualTo(expected);
}
}

View File

@ -1177,4 +1177,30 @@ public final class Constants {
*/
public static final String FS_S3A_CREATE_HEADER = "fs.s3a.create.header";
/**
* What is the smallest reasonable seek in bytes such
* that we group ranges together during vectored read operation.
* Value : {@value}.
*/
public static final String AWS_S3_VECTOR_READS_MIN_SEEK_SIZE =
"fs.s3a.vectored.read.min.seek.size";
/**
* What is the largest merged read size in bytes such
* that we group ranges together during vectored read.
* Setting this value to 0 will disable merging of ranges.
* Value : {@value}.
*/
public static final String AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE =
"fs.s3a.vectored.read.max.merged.size";
/**
* Default minimum seek in bytes during vectored reads : {@value}.
*/
public static final int DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE = 4896; // 4K
/**
* Default maximum read size in bytes during vectored reads : {@value}.
*/
public static final int DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE = 1253376; //1M
}

View File

@ -313,6 +313,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
* {@code openFile()}.
*/
private S3AInputPolicy inputPolicy;
/** Vectored IO context. */
private VectoredIOContext vectoredIOContext;
private long readAhead;
private ChangeDetectionPolicy changeDetectionPolicy;
private final AtomicBoolean closed = new AtomicBoolean(false);
private volatile boolean isClosed = false;
@ -584,6 +588,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
longBytesOption(conf, ASYNC_DRAIN_THRESHOLD,
DEFAULT_ASYNC_DRAIN_THRESHOLD, 0),
inputPolicy);
vectoredIOContext = populateVectoredIOContext(conf);
} catch (AmazonClientException e) {
// amazon client exception: stop all services then throw the translation
cleanupWithLogger(LOG, span);
@ -597,6 +602,23 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
}
}
/**
* Populates the configurations related to vectored IO operation
* in the context which has to passed down to input streams.
* @param conf configuration object.
* @return VectoredIOContext.
*/
private VectoredIOContext populateVectoredIOContext(Configuration conf) {
final int minSeekVectored = (int) longBytesOption(conf, AWS_S3_VECTOR_READS_MIN_SEEK_SIZE,
DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE, 0);
final int maxReadSizeVectored = (int) longBytesOption(conf, AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE, 0);
return new VectoredIOContext()
.setMinSeekForVectoredReads(minSeekVectored)
.setMaxReadSizeForVectoredReads(maxReadSizeVectored)
.build();
}
/**
* Set the client side encryption gauge to 0 or 1, indicating if CSE is
* enabled through the gauge or not.
@ -1552,7 +1574,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
invoker,
statistics,
statisticsContext,
fileStatus)
fileStatus,
vectoredIOContext)
.withAuditSpan(auditSpan);
openFileHelper.applyDefaultOptions(roc);
return roc.build();

View File

@ -145,6 +145,9 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
private S3AInputPolicy inputPolicy;
private long readahead = Constants.DEFAULT_READAHEAD_RANGE;
/** Vectored IO context. */
private final VectoredIOContext vectoredIOContext;
/**
* This is the actual position within the object, used by
* lazy seek to decide whether to seek on the next read or not.
@ -212,6 +215,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
setReadahead(ctx.getReadahead());
this.asyncDrainThreshold = ctx.getAsyncDrainThreshold();
this.unboundedThreadPool = unboundedThreadPool;
this.vectoredIOContext = context.getVectoredIOContext();
}
/**
@ -859,6 +863,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
sb.append(" remainingInCurrentRequest=")
.append(remainingInCurrentRequest());
sb.append(" ").append(changeTracker);
sb.append(" ").append(vectoredIOContext);
sb.append('\n').append(s);
sb.append('}');
return sb.toString();
@ -905,6 +910,22 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
}
}
/**
* {@inheritDoc}.
*/
@Override
public int minSeekForVectorReads() {
return vectoredIOContext.getMinSeekForVectorReads();
}
/**
* {@inheritDoc}.
*/
@Override
public int maxReadSizeForVectorReads() {
return vectoredIOContext.getMaxReadSizeForVectorReads();
}
/**
* {@inheritDoc}
* Vectored read implementation for S3AInputStream.

View File

@ -64,6 +64,12 @@ public class S3AReadOpContext extends S3AOpContext {
*/
private long asyncDrainThreshold;
/**
* Vectored IO context for vectored read api
* in {@code S3AInputStream#readVectored(List, IntFunction)}.
*/
private final VectoredIOContext vectoredIOContext;
/**
* Instantiate.
* @param path path of read
@ -71,17 +77,19 @@ public class S3AReadOpContext extends S3AOpContext {
* @param stats Fileystem statistics (may be null)
* @param instrumentation statistics context
* @param dstFileStatus target file status
* @param vectoredIOContext context for vectored read operation.
*/
public S3AReadOpContext(
final Path path,
Invoker invoker,
@Nullable FileSystem.Statistics stats,
S3AStatisticsContext instrumentation,
FileStatus dstFileStatus) {
FileStatus dstFileStatus,
VectoredIOContext vectoredIOContext) {
super(invoker, stats, instrumentation,
dstFileStatus);
this.path = requireNonNull(path);
this.vectoredIOContext = requireNonNull(vectoredIOContext, "vectoredIOContext");
}
/**
@ -145,6 +153,7 @@ public class S3AReadOpContext extends S3AOpContext {
}
/**
<<<<<<< HEAD
* Set builder value.
* @param value new value
* @return the builder
@ -199,6 +208,14 @@ public class S3AReadOpContext extends S3AOpContext {
return asyncDrainThreshold;
}
/**
* Get Vectored IO context for this this read op.
* @return vectored IO context.
*/
public VectoredIOContext getVectoredIOContext() {
return vectoredIOContext;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import java.util.List;
import java.util.function.IntFunction;
/**
* Context related to vectored IO operation.
* See {@link S3AInputStream#readVectored(List, IntFunction)}.
*/
public class VectoredIOContext {
/**
* What is the smallest reasonable seek that we should group
* ranges together during vectored read operation.
*/
private int minSeekForVectorReads;
/**
* What is the largest size that we should group ranges
* together during vectored read operation.
* Setting this value 0 will disable merging of ranges.
*/
private int maxReadSizeForVectorReads;
/**
* Default no arg constructor.
*/
public VectoredIOContext() {
}
public VectoredIOContext setMinSeekForVectoredReads(int minSeek) {
this.minSeekForVectorReads = minSeek;
return this;
}
public VectoredIOContext setMaxReadSizeForVectoredReads(int maxSize) {
this.maxReadSizeForVectorReads = maxSize;
return this;
}
public VectoredIOContext build() {
return this;
}
public int getMinSeekForVectorReads() {
return minSeekForVectorReads;
}
public int getMaxReadSizeForVectorReads() {
return maxReadSizeForVectorReads;
}
@Override
public String toString() {
return "VectoredIOContext{" +
"minSeekForVectorReads=" + minSeekForVectorReads +
", maxReadSizeForVectorReads=" + maxReadSizeForVectorReads +
'}';
}
}

View File

@ -55,6 +55,36 @@ it isn't, and some attempts to preserve the metaphor are "aggressively suboptima
To make most efficient use of S3, care is needed.
## <a name="vectoredIO"></a> Improving read performance using Vectored IO
The S3A FileSystem supports implementation of vectored read api using which
a client can provide a list of file ranges to read returning a future read
object associated with each range. For full api specification please see
[FSDataInputStream](../../hadoop-common-project/hadoop-common/filesystem/fsdatainputstream.html).
The following properties can be configured to optimise vectored reads based
on the client requirements.
```xml
<property>
<name>fs.s3a.vectored.read.min.seek.size</name>
<value>4K</value>
<description>
What is the smallest reasonable seek in bytes such
that we group ranges together during vectored
read operation.
</description>
</property>
<property>
<name>fs.s3a.vectored.read.max.merged.size</name>
<value>1M</value>
<description>
What is the largest merged read size in bytes such
that we group ranges together during vectored read.
Setting this value to 0 will disable merging of ranges.
</description>
</property>
```
## <a name="fadvise"></a> Improving data input performance through fadvise
The S3A Filesystem client supports the notion of input policies, similar

View File

@ -18,15 +18,23 @@
package org.apache.hadoop.fs.contract.s3a;
import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileRangeImpl;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import java.util.ArrayList;
import java.util.List;
import static org.apache.hadoop.test.MoreAsserts.assertEqual;
public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTest {
@ -42,7 +50,6 @@ public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTe
/**
* Overriding in S3 vectored read api fails fast in case of EOF
* requested range.
* @throws Exception
*/
@Override
public void testEOFRanges() throws Exception {
@ -51,4 +58,45 @@ public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTe
fileRanges.add(new FileRangeImpl(DATASET_LEN, 100));
testExceptionalVectoredRead(fs, fileRanges, "EOFException is expected");
}
@Test
public void testMinSeekAndMaxSizeConfigsPropagation() throws Exception {
Configuration conf = getFileSystem().getConf();
S3ATestUtils.removeBaseAndBucketOverrides(conf,
Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE);
S3ATestUtils.disableFilesystemCaching(conf);
final int configuredMinSeek = 2 * 1024;
final int configuredMaxSize = 10 * 1024 * 1024;
conf.set(Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE, "2K");
conf.set(Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE, "10M");
try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) {
try (FSDataInputStream fis = fs.open(path(VECTORED_READ_FILE_NAME))) {
int newMinSeek = fis.minSeekForVectorReads();
int newMaxSize = fis.maxReadSizeForVectorReads();
assertEqual(newMinSeek, configuredMinSeek,
"configured s3a min seek for vectored reads");
assertEqual(newMaxSize, configuredMaxSize,
"configured s3a max size for vectored reads");
}
}
}
@Test
public void testMinSeekAndMaxSizeDefaultValues() throws Exception {
Configuration conf = getFileSystem().getConf();
S3ATestUtils.removeBaseAndBucketOverrides(conf,
Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE,
Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE);
try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) {
try (FSDataInputStream fis = fs.open(path(VECTORED_READ_FILE_NAME))) {
int minSeek = fis.minSeekForVectorReads();
int maxSize = fis.maxReadSizeForVectorReads();
assertEqual(minSeek, Constants.DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE,
"default s3a min seek for vectored reads");
assertEqual(maxSize, Constants.DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
"default s3a max read size for vectored reads");
}
}
}
}