diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java index c925e50889d..93ed57ef830 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java @@ -84,7 +84,7 @@ public interface StreamCapabilities { * Support for vectored IO api. * See {@code PositionedReadable#readVectored(List, IntFunction)}. */ - String VECTOREDIO = "readvectored"; + String VECTOREDIO = "in:readvectored"; /** * Stream abort() capability implemented by {@link Abortable#abort()}. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 764a6adaca2..306fd8e1c67 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1203,4 +1203,23 @@ public final class Constants { * Default maximum read size in bytes during vectored reads : {@value}. */ public static final int DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE = 1253376; //1M + + /** + * Maximum number of range reads a single input stream can have + * active (downloading, or queued) to the central FileSystem + * instance's pool of queued operations. + * This stops a single stream overloading the shared thread pool. + * {@value} + *
+ * Default is {@link #DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS}
+ */
+ public static final String AWS_S3_VECTOR_ACTIVE_RANGE_READS =
+ "fs.s3a.vectored.active.ranged.reads";
+
+ /**
+ * Limit of queued range data download operations during vectored
+ * read. Value: {@value}
+ */
+ public static final int DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS = 4;
+
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index e36214dc1fe..aa6f69c2a2a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -319,6 +319,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
/** Vectored IO context. */
private VectoredIOContext vectoredIOContext;
+ /**
+ * Maximum number of active range read operation a single
+ * input stream can have.
+ */
+ private int vectoredActiveRangeReads;
+
private long readAhead;
private ChangeDetectionPolicy changeDetectionPolicy;
private final AtomicBoolean closed = new AtomicBoolean(false);
@@ -597,6 +603,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
longBytesOption(conf, ASYNC_DRAIN_THRESHOLD,
DEFAULT_ASYNC_DRAIN_THRESHOLD, 0),
inputPolicy);
+ vectoredActiveRangeReads = intOption(conf,
+ AWS_S3_VECTOR_ACTIVE_RANGE_READS, DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS, 1);
vectoredIOContext = populateVectoredIOContext(conf);
} catch (AmazonClientException e) {
// amazon client exception: stop all services then throw the translation
@@ -1504,7 +1512,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
createObjectAttributes(path, fileStatus),
createInputStreamCallbacks(auditSpan),
inputStreamStats,
- unboundedThreadPool));
+ new SemaphoredDelegatingExecutor(
+ boundedThreadPool,
+ vectoredActiveRangeReads,
+ true)));
}
/**
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index b6ac8669a67..39d41f5ffd2 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -27,7 +27,7 @@ import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntFunction;
@@ -139,7 +139,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
/**
* Thread pool used for vectored IO operation.
*/
- private final ThreadPoolExecutor unboundedThreadPool;
+ private final ExecutorService boundedThreadPool;
private final String bucket;
private final String key;
private final String pathStr;
@@ -196,13 +196,13 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
* @param s3Attributes object attributes
* @param client S3 client to use
* @param streamStatistics stream io stats.
- * @param unboundedThreadPool thread pool to use.
+ * @param boundedThreadPool thread pool to use.
*/
public S3AInputStream(S3AReadOpContext ctx,
S3ObjectAttributes s3Attributes,
InputStreamCallbacks client,
S3AInputStreamStatistics streamStatistics,
- ThreadPoolExecutor unboundedThreadPool) {
+ ExecutorService boundedThreadPool) {
Preconditions.checkArgument(isNotEmpty(s3Attributes.getBucket()),
"No Bucket");
Preconditions.checkArgument(isNotEmpty(s3Attributes.getKey()), "No Key");
@@ -224,7 +224,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
setInputPolicy(ctx.getInputPolicy());
setReadahead(ctx.getReadahead());
this.asyncDrainThreshold = ctx.getAsyncDrainThreshold();
- this.unboundedThreadPool = unboundedThreadPool;
+ this.boundedThreadPool = boundedThreadPool;
this.vectoredIOContext = context.getVectoredIOContext();
this.threadIOStatistics = requireNonNull(ctx.getIOStatisticsAggregator());
}
@@ -882,7 +882,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
streamStatistics.readVectoredOperationStarted(sortedRanges.size(), sortedRanges.size());
for (FileRange range: sortedRanges) {
ByteBuffer buffer = allocate.apply(range.getLength());
- unboundedThreadPool.submit(() -> readSingleRange(range, buffer));
+ boundedThreadPool.submit(() -> readSingleRange(range, buffer));
}
} else {
LOG.debug("Trying to merge the ranges as they are not disjoint");
@@ -893,7 +893,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
LOG.debug("Number of original ranges size {} , Number of combined ranges {} ",
ranges.size(), combinedFileRanges.size());
for (CombinedFileRange combinedFileRange: combinedFileRanges) {
- unboundedThreadPool.submit(
+ boundedThreadPool.submit(
() -> readCombinedRangeAndUpdateChildren(combinedFileRange, allocate));
}
}
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md
index 06eb137cd9b..88e6e8a0b21 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md
@@ -75,13 +75,22 @@ on the client requirements.