HADOOP-18379 rebase feature/HADOOP-18028-s3a-prefetch to trunk

Fixes the build and a test failure (ITestS3ARequesterPays) which
was always there if you tested without prefetching enabled.

Change-Id: I4503c64856cfeb453b558808065b38455e1fce23
This commit is contained in:
Steve Loughran 2022-07-28 14:58:03 +01:00
parent a9dbd7d62f
commit e23f70a03c
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
4 changed files with 16 additions and 2 deletions

View File

@ -1209,6 +1209,11 @@ public final class Constants {
*/ */
public static final String PREFETCH_ENABLED_KEY = "fs.s3a.prefetch.enabled"; public static final String PREFETCH_ENABLED_KEY = "fs.s3a.prefetch.enabled";
/**
* Default option as to whether the prefetching input stream is enabled.
*/
public static final boolean PREFETCH_ENABLED_DEFAULT = false;
// If the default values are used, each file opened for reading will consume // If the default values are used, each file opened for reading will consume
// 64 MB of heap space (8 blocks x 8 MB each). // 64 MB of heap space (8 blocks x 8 MB each).

View File

@ -504,7 +504,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
longBytesOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1); longBytesOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true); enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
this.prefetchEnabled = conf.getBoolean(PREFETCH_ENABLED_KEY, false); this.prefetchEnabled = conf.getBoolean(PREFETCH_ENABLED_KEY, PREFETCH_ENABLED_DEFAULT);
this.prefetchBlockSize = intOption( this.prefetchBlockSize = intOption(
conf, PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE, PREFETCH_BLOCK_DEFAULT_SIZE); conf, PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE, PREFETCH_BLOCK_DEFAULT_SIZE);
this.prefetchBlockCount = this.prefetchBlockCount =

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
import org.apache.hadoop.fs.statistics.StreamStatisticNames; import org.apache.hadoop.fs.statistics.StreamStatisticNames;
import static org.apache.hadoop.fs.s3a.Constants.ALLOW_REQUESTER_PAYS; import static org.apache.hadoop.fs.s3a.Constants.ALLOW_REQUESTER_PAYS;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_DEFAULT;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE; import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE;
import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.test.LambdaTestUtils.intercept;
@ -79,7 +80,7 @@ public class ITestS3ARequesterPays extends AbstractS3ATestBase {
inputStream.seek(0); inputStream.seek(0);
inputStream.readByte(); inputStream.readByte();
if (conf.getBoolean(PREFETCH_ENABLED_KEY, true)) { if (conf.getBoolean(PREFETCH_ENABLED_KEY, PREFETCH_ENABLED_DEFAULT)) {
// For S3PrefetchingInputStream, verify a call was made // For S3PrefetchingInputStream, verify a call was made
IOStatisticAssertions.assertThatStatisticCounter(inputStream.getIOStatistics(), IOStatisticAssertions.assertThatStatisticCounter(inputStream.getIOStatistics(),
StreamStatisticNames.STREAM_READ_OPENED).isEqualTo(1); StreamStatisticNames.STREAM_READ_OPENED).isEqualTo(1);

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.fs.s3a.S3AInputPolicy;
import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3AReadOpContext; import org.apache.hadoop.fs.s3a.S3AReadOpContext;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes; import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
import org.apache.hadoop.fs.s3a.VectoredIOContext;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy; import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import org.apache.hadoop.fs.s3a.impl.ChangeTracker; import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
@ -59,6 +60,8 @@ import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.util.functional.CallableRaisingIOE; import org.apache.hadoop.util.functional.CallableRaisingIOE;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.emptyStatisticsStore;
/** /**
* Provides 'fake' implementations of S3InputStream variants. * Provides 'fake' implementations of S3InputStream variants.
* *
@ -130,6 +133,11 @@ public final class Fakes {
statistics, statistics,
statisticsContext, statisticsContext,
fileStatus, fileStatus,
new VectoredIOContext()
.setMinSeekForVectoredReads(1)
.setMaxReadSizeForVectoredReads(1)
.build(),
emptyStatisticsStore(),
futurePool, futurePool,
prefetchBlockSize, prefetchBlockSize,
prefetchBlockCount) prefetchBlockCount)