HADOOP-18347. S3A Vectored IO to use bounded thread pool. (#4918)

part of HADOOP-18103.

Also introducing a config fs.s3a.vectored.active.ranged.reads
to configure the maximum number of number of range reads a
single input stream can have active (downloading, or queued)
to the central FileSystem instance's pool of queued operations.
This stops a single stream overloading the shared thread pool.

Contributed by: Mukund Thakur
 Conflicts:
	hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
	hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
This commit is contained in:
Mukund Thakur 2022-09-27 21:13:07 +05:30
parent e5a566c91f
commit bbe841e601
5 changed files with 55 additions and 16 deletions

View File

@ -84,7 +84,7 @@ public interface StreamCapabilities {
* Support for vectored IO api. * Support for vectored IO api.
* See {@code PositionedReadable#readVectored(List, IntFunction)}. * See {@code PositionedReadable#readVectored(List, IntFunction)}.
*/ */
String VECTOREDIO = "readvectored"; String VECTOREDIO = "in:readvectored";
/** /**
* Stream abort() capability implemented by {@link Abortable#abort()}. * Stream abort() capability implemented by {@link Abortable#abort()}.

View File

@ -1203,4 +1203,23 @@ public final class Constants {
* Default maximum read size in bytes during vectored reads : {@value}. * Default maximum read size in bytes during vectored reads : {@value}.
*/ */
public static final int DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE = 1253376; //1M public static final int DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE = 1253376; //1M
/**
* Maximum number of range reads a single input stream can have
* active (downloading, or queued) to the central FileSystem
* instance's pool of queued operations.
* This stops a single stream overloading the shared thread pool.
* {@value}
* <p>
* Default is {@link #DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS}
*/
public static final String AWS_S3_VECTOR_ACTIVE_RANGE_READS =
"fs.s3a.vectored.active.ranged.reads";
/**
* Limit of queued range data download operations during vectored
* read. Value: {@value}
*/
public static final int DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS = 4;
} }

View File

@ -319,6 +319,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
/** Vectored IO context. */ /** Vectored IO context. */
private VectoredIOContext vectoredIOContext; private VectoredIOContext vectoredIOContext;
/**
* Maximum number of active range read operation a single
* input stream can have.
*/
private int vectoredActiveRangeReads;
private long readAhead; private long readAhead;
private ChangeDetectionPolicy changeDetectionPolicy; private ChangeDetectionPolicy changeDetectionPolicy;
private final AtomicBoolean closed = new AtomicBoolean(false); private final AtomicBoolean closed = new AtomicBoolean(false);
@ -597,6 +603,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
longBytesOption(conf, ASYNC_DRAIN_THRESHOLD, longBytesOption(conf, ASYNC_DRAIN_THRESHOLD,
DEFAULT_ASYNC_DRAIN_THRESHOLD, 0), DEFAULT_ASYNC_DRAIN_THRESHOLD, 0),
inputPolicy); inputPolicy);
vectoredActiveRangeReads = intOption(conf,
AWS_S3_VECTOR_ACTIVE_RANGE_READS, DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS, 1);
vectoredIOContext = populateVectoredIOContext(conf); vectoredIOContext = populateVectoredIOContext(conf);
} catch (AmazonClientException e) { } catch (AmazonClientException e) {
// amazon client exception: stop all services then throw the translation // amazon client exception: stop all services then throw the translation
@ -1504,7 +1512,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
createObjectAttributes(path, fileStatus), createObjectAttributes(path, fileStatus),
createInputStreamCallbacks(auditSpan), createInputStreamCallbacks(auditSpan),
inputStreamStats, inputStreamStats,
unboundedThreadPool)); new SemaphoredDelegatingExecutor(
boundedThreadPool,
vectoredActiveRangeReads,
true)));
} }
/** /**

View File

@ -27,7 +27,7 @@ import java.net.SocketTimeoutException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntFunction; import java.util.function.IntFunction;
@ -139,7 +139,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
/** /**
* Thread pool used for vectored IO operation. * Thread pool used for vectored IO operation.
*/ */
private final ThreadPoolExecutor unboundedThreadPool; private final ExecutorService boundedThreadPool;
private final String bucket; private final String bucket;
private final String key; private final String key;
private final String pathStr; private final String pathStr;
@ -196,13 +196,13 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
* @param s3Attributes object attributes * @param s3Attributes object attributes
* @param client S3 client to use * @param client S3 client to use
* @param streamStatistics stream io stats. * @param streamStatistics stream io stats.
* @param unboundedThreadPool thread pool to use. * @param boundedThreadPool thread pool to use.
*/ */
public S3AInputStream(S3AReadOpContext ctx, public S3AInputStream(S3AReadOpContext ctx,
S3ObjectAttributes s3Attributes, S3ObjectAttributes s3Attributes,
InputStreamCallbacks client, InputStreamCallbacks client,
S3AInputStreamStatistics streamStatistics, S3AInputStreamStatistics streamStatistics,
ThreadPoolExecutor unboundedThreadPool) { ExecutorService boundedThreadPool) {
Preconditions.checkArgument(isNotEmpty(s3Attributes.getBucket()), Preconditions.checkArgument(isNotEmpty(s3Attributes.getBucket()),
"No Bucket"); "No Bucket");
Preconditions.checkArgument(isNotEmpty(s3Attributes.getKey()), "No Key"); Preconditions.checkArgument(isNotEmpty(s3Attributes.getKey()), "No Key");
@ -224,7 +224,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
setInputPolicy(ctx.getInputPolicy()); setInputPolicy(ctx.getInputPolicy());
setReadahead(ctx.getReadahead()); setReadahead(ctx.getReadahead());
this.asyncDrainThreshold = ctx.getAsyncDrainThreshold(); this.asyncDrainThreshold = ctx.getAsyncDrainThreshold();
this.unboundedThreadPool = unboundedThreadPool; this.boundedThreadPool = boundedThreadPool;
this.vectoredIOContext = context.getVectoredIOContext(); this.vectoredIOContext = context.getVectoredIOContext();
this.threadIOStatistics = requireNonNull(ctx.getIOStatisticsAggregator()); this.threadIOStatistics = requireNonNull(ctx.getIOStatisticsAggregator());
} }
@ -882,7 +882,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
streamStatistics.readVectoredOperationStarted(sortedRanges.size(), sortedRanges.size()); streamStatistics.readVectoredOperationStarted(sortedRanges.size(), sortedRanges.size());
for (FileRange range: sortedRanges) { for (FileRange range: sortedRanges) {
ByteBuffer buffer = allocate.apply(range.getLength()); ByteBuffer buffer = allocate.apply(range.getLength());
unboundedThreadPool.submit(() -> readSingleRange(range, buffer)); boundedThreadPool.submit(() -> readSingleRange(range, buffer));
} }
} else { } else {
LOG.debug("Trying to merge the ranges as they are not disjoint"); LOG.debug("Trying to merge the ranges as they are not disjoint");
@ -893,7 +893,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
LOG.debug("Number of original ranges size {} , Number of combined ranges {} ", LOG.debug("Number of original ranges size {} , Number of combined ranges {} ",
ranges.size(), combinedFileRanges.size()); ranges.size(), combinedFileRanges.size());
for (CombinedFileRange combinedFileRange: combinedFileRanges) { for (CombinedFileRange combinedFileRange: combinedFileRanges) {
unboundedThreadPool.submit( boundedThreadPool.submit(
() -> readCombinedRangeAndUpdateChildren(combinedFileRange, allocate)); () -> readCombinedRangeAndUpdateChildren(combinedFileRange, allocate));
} }
} }

View File

@ -75,13 +75,22 @@ on the client requirements.
</description> </description>
</property> </property>
<property> <property>
<name>fs.s3a.vectored.read.max.merged.size</name> <name>fs.s3a.vectored.read.max.merged.size</name>
<value>1M</value> <value>1M</value>
<description> <description>
What is the largest merged read size in bytes such What is the largest merged read size in bytes such
that we group ranges together during vectored read. that we group ranges together during vectored read.
Setting this value to 0 will disable merging of ranges. Setting this value to 0 will disable merging of ranges.
</description> </description>
<property>
<name>fs.s3a.vectored.active.ranged.reads</name>
<value>4</value>
<description>
Maximum number of range reads a single input stream can have
active (downloading, or queued) to the central FileSystem
instance's pool of queued operations.
This stops a single stream overloading the shared thread pool.
</description>
</property> </property>
``` ```