HADOOP-18104: S3A: Add configs to configure minSeekForVectorReads and maxReadSizeForVectorReads (#3964)
Part of HADOOP-18103. Introducing fs.s3a.vectored.read.min.seek.size and fs.s3a.vectored.read.max.merged.size to configure min seek and max read during a vectored IO operation in S3A connector. These properties actually define how the ranges will be merged. To completely disable merging set fs.s3a.max.readsize.vectored.read to 0. Contributed By: Mukund Thakur
This commit is contained in:
parent
5c348c41ab
commit
9f03f87963
|
@ -474,6 +474,7 @@ end of first and start of next range is more than this value.
|
|||
|
||||
Maximum number of bytes which can be read in one go after merging the ranges.
|
||||
Two ranges won't be merged if the combined data to be read is more than this value.
|
||||
Essentially setting this to 0 will disable the merging of ranges.
|
||||
|
||||
## Consistency
|
||||
|
||||
|
|
|
@ -18,15 +18,6 @@
|
|||
|
||||
package org.apache.hadoop.fs.contract;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileRange;
|
||||
import org.apache.hadoop.fs.FileRangeImpl;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.impl.FutureIOSupport;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -42,6 +33,15 @@ import org.junit.Assert;
|
|||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileRange;
|
||||
import org.apache.hadoop.fs.FileRangeImpl;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.impl.FutureIOSupport;
|
||||
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
|
||||
|
||||
|
@ -52,7 +52,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
|||
|
||||
public static final int DATASET_LEN = 64 * 1024;
|
||||
private static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
|
||||
private static final String VECTORED_READ_FILE_NAME = "vectored_file.txt";
|
||||
protected static final String VECTORED_READ_FILE_NAME = "vectored_file.txt";
|
||||
private static final String VECTORED_READ_FILE_1MB_NAME = "vectored_file_1M.txt";
|
||||
private static final byte[] DATASET_MB = ContractTestUtils.dataset(1024 * 1024, 'a', 256);
|
||||
|
||||
|
@ -172,6 +172,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSameRanges() throws Exception {
|
||||
FileSystem fs = getFileSystem();
|
||||
List<FileRange> fileRanges = new ArrayList<>();
|
||||
|
|
|
@ -207,6 +207,30 @@ public class TestVectoredReadUtils extends HadoopTestBase {
|
|||
VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxSizeZeroDisablesMering() throws Exception {
|
||||
List<FileRange> randomRanges = Arrays.asList(
|
||||
new FileRangeImpl(3000, 110),
|
||||
new FileRangeImpl(3000, 100),
|
||||
new FileRangeImpl(2100, 100)
|
||||
);
|
||||
assertEqualRangeCountsAfterMerging(randomRanges, 1, 1, 0);
|
||||
assertEqualRangeCountsAfterMerging(randomRanges, 1, 0, 0);
|
||||
assertEqualRangeCountsAfterMerging(randomRanges, 1, 100, 0);
|
||||
}
|
||||
|
||||
private void assertEqualRangeCountsAfterMerging(List<FileRange> inputRanges,
|
||||
int chunkSize,
|
||||
int minimumSeek,
|
||||
int maxSize) {
|
||||
List<CombinedFileRange> combinedFileRanges = VectoredReadUtils
|
||||
.sortAndMergeRanges(inputRanges, chunkSize, minimumSeek, maxSize);
|
||||
Assertions.assertThat(combinedFileRanges)
|
||||
.describedAs("Mismatch in number of ranges post merging")
|
||||
.hasSize(inputRanges.size());
|
||||
}
|
||||
|
||||
interface Stream extends PositionedReadable, ByteBufferPositionedReadable {
|
||||
// nothing
|
||||
}
|
||||
|
|
|
@ -86,4 +86,16 @@ public class MoreAsserts {
|
|||
"completed exceptionally")
|
||||
.isTrue();
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert two same type of values.
|
||||
* @param actual actual value.
|
||||
* @param expected expected value.
|
||||
* @param message error message to print in case of mismatch.
|
||||
*/
|
||||
public static <T> void assertEqual(T actual, T expected, String message) {
|
||||
Assertions.assertThat(actual)
|
||||
.describedAs("Mismatch in %s", message)
|
||||
.isEqualTo(expected);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1177,4 +1177,30 @@ public final class Constants {
|
|||
*/
|
||||
public static final String FS_S3A_CREATE_HEADER = "fs.s3a.create.header";
|
||||
|
||||
/**
|
||||
* What is the smallest reasonable seek in bytes such
|
||||
* that we group ranges together during vectored read operation.
|
||||
* Value : {@value}.
|
||||
*/
|
||||
public static final String AWS_S3_VECTOR_READS_MIN_SEEK_SIZE =
|
||||
"fs.s3a.vectored.read.min.seek.size";
|
||||
|
||||
/**
|
||||
* What is the largest merged read size in bytes such
|
||||
* that we group ranges together during vectored read.
|
||||
* Setting this value to 0 will disable merging of ranges.
|
||||
* Value : {@value}.
|
||||
*/
|
||||
public static final String AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE =
|
||||
"fs.s3a.vectored.read.max.merged.size";
|
||||
|
||||
/**
|
||||
* Default minimum seek in bytes during vectored reads : {@value}.
|
||||
*/
|
||||
public static final int DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE = 4896; // 4K
|
||||
|
||||
/**
|
||||
* Default maximum read size in bytes during vectored reads : {@value}.
|
||||
*/
|
||||
public static final int DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE = 1253376; //1M
|
||||
}
|
||||
|
|
|
@ -313,6 +313,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
* {@code openFile()}.
|
||||
*/
|
||||
private S3AInputPolicy inputPolicy;
|
||||
/** Vectored IO context. */
|
||||
private VectoredIOContext vectoredIOContext;
|
||||
|
||||
private long readAhead;
|
||||
private ChangeDetectionPolicy changeDetectionPolicy;
|
||||
private final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
private volatile boolean isClosed = false;
|
||||
|
@ -584,6 +588,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
longBytesOption(conf, ASYNC_DRAIN_THRESHOLD,
|
||||
DEFAULT_ASYNC_DRAIN_THRESHOLD, 0),
|
||||
inputPolicy);
|
||||
vectoredIOContext = populateVectoredIOContext(conf);
|
||||
} catch (AmazonClientException e) {
|
||||
// amazon client exception: stop all services then throw the translation
|
||||
cleanupWithLogger(LOG, span);
|
||||
|
@ -597,6 +602,23 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Populates the configurations related to vectored IO operation
|
||||
* in the context which has to passed down to input streams.
|
||||
* @param conf configuration object.
|
||||
* @return VectoredIOContext.
|
||||
*/
|
||||
private VectoredIOContext populateVectoredIOContext(Configuration conf) {
|
||||
final int minSeekVectored = (int) longBytesOption(conf, AWS_S3_VECTOR_READS_MIN_SEEK_SIZE,
|
||||
DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE, 0);
|
||||
final int maxReadSizeVectored = (int) longBytesOption(conf, AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
|
||||
DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE, 0);
|
||||
return new VectoredIOContext()
|
||||
.setMinSeekForVectoredReads(minSeekVectored)
|
||||
.setMaxReadSizeForVectoredReads(maxReadSizeVectored)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the client side encryption gauge to 0 or 1, indicating if CSE is
|
||||
* enabled through the gauge or not.
|
||||
|
@ -1552,7 +1574,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
invoker,
|
||||
statistics,
|
||||
statisticsContext,
|
||||
fileStatus)
|
||||
fileStatus,
|
||||
vectoredIOContext)
|
||||
.withAuditSpan(auditSpan);
|
||||
openFileHelper.applyDefaultOptions(roc);
|
||||
return roc.build();
|
||||
|
|
|
@ -145,6 +145,9 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|||
private S3AInputPolicy inputPolicy;
|
||||
private long readahead = Constants.DEFAULT_READAHEAD_RANGE;
|
||||
|
||||
/** Vectored IO context. */
|
||||
private final VectoredIOContext vectoredIOContext;
|
||||
|
||||
/**
|
||||
* This is the actual position within the object, used by
|
||||
* lazy seek to decide whether to seek on the next read or not.
|
||||
|
@ -212,6 +215,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|||
setReadahead(ctx.getReadahead());
|
||||
this.asyncDrainThreshold = ctx.getAsyncDrainThreshold();
|
||||
this.unboundedThreadPool = unboundedThreadPool;
|
||||
this.vectoredIOContext = context.getVectoredIOContext();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -859,6 +863,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|||
sb.append(" remainingInCurrentRequest=")
|
||||
.append(remainingInCurrentRequest());
|
||||
sb.append(" ").append(changeTracker);
|
||||
sb.append(" ").append(vectoredIOContext);
|
||||
sb.append('\n').append(s);
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
|
@ -905,6 +910,22 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}.
|
||||
*/
|
||||
@Override
|
||||
public int minSeekForVectorReads() {
|
||||
return vectoredIOContext.getMinSeekForVectorReads();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}.
|
||||
*/
|
||||
@Override
|
||||
public int maxReadSizeForVectorReads() {
|
||||
return vectoredIOContext.getMaxReadSizeForVectorReads();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
* Vectored read implementation for S3AInputStream.
|
||||
|
|
|
@ -64,6 +64,12 @@ public class S3AReadOpContext extends S3AOpContext {
|
|||
*/
|
||||
private long asyncDrainThreshold;
|
||||
|
||||
/**
|
||||
* Vectored IO context for vectored read api
|
||||
* in {@code S3AInputStream#readVectored(List, IntFunction)}.
|
||||
*/
|
||||
private final VectoredIOContext vectoredIOContext;
|
||||
|
||||
/**
|
||||
* Instantiate.
|
||||
* @param path path of read
|
||||
|
@ -71,17 +77,19 @@ public class S3AReadOpContext extends S3AOpContext {
|
|||
* @param stats Fileystem statistics (may be null)
|
||||
* @param instrumentation statistics context
|
||||
* @param dstFileStatus target file status
|
||||
* @param vectoredIOContext context for vectored read operation.
|
||||
*/
|
||||
public S3AReadOpContext(
|
||||
final Path path,
|
||||
Invoker invoker,
|
||||
@Nullable FileSystem.Statistics stats,
|
||||
S3AStatisticsContext instrumentation,
|
||||
FileStatus dstFileStatus) {
|
||||
|
||||
FileStatus dstFileStatus,
|
||||
VectoredIOContext vectoredIOContext) {
|
||||
super(invoker, stats, instrumentation,
|
||||
dstFileStatus);
|
||||
this.path = requireNonNull(path);
|
||||
this.vectoredIOContext = requireNonNull(vectoredIOContext, "vectoredIOContext");
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -145,6 +153,7 @@ public class S3AReadOpContext extends S3AOpContext {
|
|||
}
|
||||
|
||||
/**
|
||||
<<<<<<< HEAD
|
||||
* Set builder value.
|
||||
* @param value new value
|
||||
* @return the builder
|
||||
|
@ -199,6 +208,14 @@ public class S3AReadOpContext extends S3AOpContext {
|
|||
return asyncDrainThreshold;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Vectored IO context for this this read op.
|
||||
* @return vectored IO context.
|
||||
*/
|
||||
public VectoredIOContext getVectoredIOContext() {
|
||||
return vectoredIOContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder sb = new StringBuilder(
|
||||
|
|
|
@ -0,0 +1,78 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.function.IntFunction;
|
||||
|
||||
/**
|
||||
* Context related to vectored IO operation.
|
||||
* See {@link S3AInputStream#readVectored(List, IntFunction)}.
|
||||
*/
|
||||
public class VectoredIOContext {
|
||||
|
||||
/**
|
||||
* What is the smallest reasonable seek that we should group
|
||||
* ranges together during vectored read operation.
|
||||
*/
|
||||
private int minSeekForVectorReads;
|
||||
|
||||
/**
|
||||
* What is the largest size that we should group ranges
|
||||
* together during vectored read operation.
|
||||
* Setting this value 0 will disable merging of ranges.
|
||||
*/
|
||||
private int maxReadSizeForVectorReads;
|
||||
|
||||
/**
|
||||
* Default no arg constructor.
|
||||
*/
|
||||
public VectoredIOContext() {
|
||||
}
|
||||
|
||||
public VectoredIOContext setMinSeekForVectoredReads(int minSeek) {
|
||||
this.minSeekForVectorReads = minSeek;
|
||||
return this;
|
||||
}
|
||||
|
||||
public VectoredIOContext setMaxReadSizeForVectoredReads(int maxSize) {
|
||||
this.maxReadSizeForVectorReads = maxSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
public VectoredIOContext build() {
|
||||
return this;
|
||||
}
|
||||
|
||||
public int getMinSeekForVectorReads() {
|
||||
return minSeekForVectorReads;
|
||||
}
|
||||
|
||||
public int getMaxReadSizeForVectorReads() {
|
||||
return maxReadSizeForVectorReads;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "VectoredIOContext{" +
|
||||
"minSeekForVectorReads=" + minSeekForVectorReads +
|
||||
", maxReadSizeForVectorReads=" + maxReadSizeForVectorReads +
|
||||
'}';
|
||||
}
|
||||
}
|
|
@ -55,6 +55,36 @@ it isn't, and some attempts to preserve the metaphor are "aggressively suboptima
|
|||
|
||||
To make most efficient use of S3, care is needed.
|
||||
|
||||
## <a name="vectoredIO"></a> Improving read performance using Vectored IO
|
||||
The S3A FileSystem supports implementation of vectored read api using which
|
||||
a client can provide a list of file ranges to read returning a future read
|
||||
object associated with each range. For full api specification please see
|
||||
[FSDataInputStream](../../hadoop-common-project/hadoop-common/filesystem/fsdatainputstream.html).
|
||||
|
||||
The following properties can be configured to optimise vectored reads based
|
||||
on the client requirements.
|
||||
|
||||
```xml
|
||||
<property>
|
||||
<name>fs.s3a.vectored.read.min.seek.size</name>
|
||||
<value>4K</value>
|
||||
<description>
|
||||
What is the smallest reasonable seek in bytes such
|
||||
that we group ranges together during vectored
|
||||
read operation.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>fs.s3a.vectored.read.max.merged.size</name>
|
||||
<value>1M</value>
|
||||
<description>
|
||||
What is the largest merged read size in bytes such
|
||||
that we group ranges together during vectored read.
|
||||
Setting this value to 0 will disable merging of ranges.
|
||||
</description>
|
||||
</property>
|
||||
```
|
||||
|
||||
## <a name="fadvise"></a> Improving data input performance through fadvise
|
||||
|
||||
The S3A Filesystem client supports the notion of input policies, similar
|
||||
|
|
|
@ -18,15 +18,23 @@
|
|||
|
||||
package org.apache.hadoop.fs.contract.s3a;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileRange;
|
||||
import org.apache.hadoop.fs.FileRangeImpl;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
import org.apache.hadoop.fs.s3a.Constants;
|
||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||
import org.apache.hadoop.fs.s3a.S3ATestUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import static org.apache.hadoop.test.MoreAsserts.assertEqual;
|
||||
|
||||
public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTest {
|
||||
|
||||
|
@ -42,7 +50,6 @@ public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTe
|
|||
/**
|
||||
* Overriding in S3 vectored read api fails fast in case of EOF
|
||||
* requested range.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Override
|
||||
public void testEOFRanges() throws Exception {
|
||||
|
@ -51,4 +58,45 @@ public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTe
|
|||
fileRanges.add(new FileRangeImpl(DATASET_LEN, 100));
|
||||
testExceptionalVectoredRead(fs, fileRanges, "EOFException is expected");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMinSeekAndMaxSizeConfigsPropagation() throws Exception {
|
||||
Configuration conf = getFileSystem().getConf();
|
||||
S3ATestUtils.removeBaseAndBucketOverrides(conf,
|
||||
Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
|
||||
Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE);
|
||||
S3ATestUtils.disableFilesystemCaching(conf);
|
||||
final int configuredMinSeek = 2 * 1024;
|
||||
final int configuredMaxSize = 10 * 1024 * 1024;
|
||||
conf.set(Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE, "2K");
|
||||
conf.set(Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE, "10M");
|
||||
try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) {
|
||||
try (FSDataInputStream fis = fs.open(path(VECTORED_READ_FILE_NAME))) {
|
||||
int newMinSeek = fis.minSeekForVectorReads();
|
||||
int newMaxSize = fis.maxReadSizeForVectorReads();
|
||||
assertEqual(newMinSeek, configuredMinSeek,
|
||||
"configured s3a min seek for vectored reads");
|
||||
assertEqual(newMaxSize, configuredMaxSize,
|
||||
"configured s3a max size for vectored reads");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMinSeekAndMaxSizeDefaultValues() throws Exception {
|
||||
Configuration conf = getFileSystem().getConf();
|
||||
S3ATestUtils.removeBaseAndBucketOverrides(conf,
|
||||
Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE,
|
||||
Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE);
|
||||
try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) {
|
||||
try (FSDataInputStream fis = fs.open(path(VECTORED_READ_FILE_NAME))) {
|
||||
int minSeek = fis.minSeekForVectorReads();
|
||||
int maxSize = fis.maxReadSizeForVectorReads();
|
||||
assertEqual(minSeek, Constants.DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE,
|
||||
"default s3a min seek for vectored reads");
|
||||
assertEqual(maxSize, Constants.DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
|
||||
"default s3a max read size for vectored reads");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue