From 76ce3cb5dcd36949f916098aa8a58e29c6f7664a Mon Sep 17 00:00:00 2001 From: Hangxiang Yu Date: Mon, 28 Nov 2022 23:04:00 +0800 Subject: [PATCH] HADOOP-18543. AliyunOSSFileSystem#open(Path path, int bufferSize) use buffer size as its downloadPartSize --- .../fs/aliyun/oss/AliyunOSSFileSystem.java | 17 ++++++++++- .../fs/aliyun/oss/AliyunOSSInputStream.java | 30 ++++++++++++++----- .../aliyun/oss/TestAliyunOSSInputStream.java | 29 ++++++++++++++++++ 3 files changed, 68 insertions(+), 8 deletions(-) diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java index 5f40488bfd6..5ddd3cfccf1 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java @@ -585,6 +585,21 @@ public class AliyunOSSFileSystem extends FileSystem { } while (fPart != null); } + @Override + public FSDataInputStream open(Path path) throws IOException { + final FileStatus fileStatus = getFileStatus(path); + if (fileStatus.isDirectory()) { + throw new FileNotFoundException("Can't open " + path + + " because it is a directory"); + } + + return new FSDataInputStream(new AliyunOSSInputStream(getConf(), + new SemaphoredDelegatingExecutor( + boundedThreadPool, maxReadAheadPartNumber, true), + maxReadAheadPartNumber, store, pathToKey(path), fileStatus.getLen(), + statistics)); + } + @Override public FSDataInputStream open(Path path, int bufferSize) throws IOException { final FileStatus fileStatus = getFileStatus(path); @@ -593,7 +608,7 @@ public class AliyunOSSFileSystem extends FileSystem { " because it is a directory"); } - return new FSDataInputStream(new AliyunOSSInputStream(getConf(), + return new FSDataInputStream(new AliyunOSSInputStream(bufferSize, new SemaphoredDelegatingExecutor( boundedThreadPool, maxReadAheadPartNumber, true), maxReadAheadPartNumber, store, pathToKey(path), fileStatus.getLen(), diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java index 9a89a086df0..295f817ac36 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem.Statistics; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; import static org.apache.hadoop.fs.aliyun.oss.Constants.*; /** @@ -57,18 +58,21 @@ public class AliyunOSSInputStream extends FSInputStream { private ExecutorService readAheadExecutorService; private Queue readBufferQueue = new ArrayDeque<>(); - public AliyunOSSInputStream(Configuration conf, - ExecutorService readAheadExecutorService, int maxReadAheadPartNumber, - AliyunOSSFileSystemStore store, String key, Long contentLength, - Statistics statistics) throws IOException { + public AliyunOSSInputStream( + long downloadPartSize, + ExecutorService readAheadExecutorService, + int maxReadAheadPartNumber, + AliyunOSSFileSystemStore store, + String key, + Long contentLength, + Statistics statistics) throws IOException { this.readAheadExecutorService = - MoreExecutors.listeningDecorator(readAheadExecutorService); + MoreExecutors.listeningDecorator(readAheadExecutorService); this.store = store; this.key = key; this.statistics = statistics; this.contentLength = contentLength; - downloadPartSize = conf.getLong(MULTIPART_DOWNLOAD_SIZE_KEY, - MULTIPART_DOWNLOAD_SIZE_DEFAULT); + this.downloadPartSize = Math.max(downloadPartSize, IO_FILE_BUFFER_SIZE_DEFAULT); this.maxReadAheadPartNumber = maxReadAheadPartNumber; this.expectNextPos = 0; @@ -77,6 +81,18 @@ public class AliyunOSSInputStream extends FSInputStream { closed = false; } + public AliyunOSSInputStream(Configuration conf, + ExecutorService readAheadExecutorService, int maxReadAheadPartNumber, + AliyunOSSFileSystemStore store, String key, Long contentLength, + Statistics statistics) throws IOException { + this(conf.getLong(MULTIPART_DOWNLOAD_SIZE_KEY, MULTIPART_DOWNLOAD_SIZE_DEFAULT), + readAheadExecutorService, maxReadAheadPartNumber, store, key, contentLength, statistics); + } + + long getDownloadPartSize() { + return downloadPartSize; + } + /** * Reopen the wrapped stream at give position, by seeking for * data of a part length from object content stream. diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java index 32d0e464807..211014157b1 100644 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java @@ -33,8 +33,12 @@ import org.junit.rules.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.InputStream; import java.util.Random; +import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT; +import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_DOWNLOAD_SIZE_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -108,6 +112,31 @@ public class TestAliyunOSSInputStream { IOUtils.closeStream(instream); } + @Test + public void testConfiguration() throws IOException { + Path configurationFile = setPath("/test/configurationFile.txt"); + long size = 5 * 1024 * 1024; + + ContractTestUtils.generateTestFile(this.fs, configurationFile, size, 256, 255); + LOG.info("5MB file created: configurationFile.txt"); + + FSDataInputStream instream = this.fs.open(configurationFile); + assertTrue(instream.getWrappedStream() instanceof AliyunOSSInputStream); + AliyunOSSInputStream wrappedStream = (AliyunOSSInputStream) instream.getWrappedStream(); + assertEquals( + fs.getConf().getLong(MULTIPART_DOWNLOAD_SIZE_KEY, MULTIPART_DOWNLOAD_SIZE_DEFAULT), + wrappedStream.getDownloadPartSize()); + IOUtils.closeStream(instream); + + instream = this.fs.open(configurationFile, 1024); + assertTrue(instream.getWrappedStream() instanceof AliyunOSSInputStream); + wrappedStream = (AliyunOSSInputStream) instream.getWrappedStream(); + assertEquals( + 1024, + wrappedStream.getDownloadPartSize()); + IOUtils.closeStream(instream); + } + @Test public void testSequentialAndRandomRead() throws Exception { Path smallSeekFile = setPath("/test/smallSeekFile.txt");