HADOOP-18543. AliyunOSSFileSystem#open(Path path, int bufferSize) use buffer size as its downloadPartSize

This commit is contained in:
Hangxiang Yu 2022-11-28 23:04:00 +08:00
parent e09e81abe4
commit 76ce3cb5dc
3 changed files with 68 additions and 8 deletions

View File

@ -585,6 +585,21 @@ public class AliyunOSSFileSystem extends FileSystem {
} while (fPart != null);
}
@Override
public FSDataInputStream open(Path path) throws IOException {
final FileStatus fileStatus = getFileStatus(path);
if (fileStatus.isDirectory()) {
throw new FileNotFoundException("Can't open " + path +
" because it is a directory");
}
return new FSDataInputStream(new AliyunOSSInputStream(getConf(),
new SemaphoredDelegatingExecutor(
boundedThreadPool, maxReadAheadPartNumber, true),
maxReadAheadPartNumber, store, pathToKey(path), fileStatus.getLen(),
statistics));
}
@Override
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
final FileStatus fileStatus = getFileStatus(path);
@ -593,7 +608,7 @@ public class AliyunOSSFileSystem extends FileSystem {
" because it is a directory");
}
return new FSDataInputStream(new AliyunOSSInputStream(getConf(),
return new FSDataInputStream(new AliyunOSSInputStream(bufferSize,
new SemaphoredDelegatingExecutor(
boundedThreadPool, maxReadAheadPartNumber, true),
maxReadAheadPartNumber, store, pathToKey(path), fileStatus.getLen(),

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
/**
@ -57,18 +58,21 @@ public class AliyunOSSInputStream extends FSInputStream {
private ExecutorService readAheadExecutorService;
private Queue<ReadBuffer> readBufferQueue = new ArrayDeque<>();
public AliyunOSSInputStream(Configuration conf,
ExecutorService readAheadExecutorService, int maxReadAheadPartNumber,
AliyunOSSFileSystemStore store, String key, Long contentLength,
Statistics statistics) throws IOException {
public AliyunOSSInputStream(
long downloadPartSize,
ExecutorService readAheadExecutorService,
int maxReadAheadPartNumber,
AliyunOSSFileSystemStore store,
String key,
Long contentLength,
Statistics statistics) throws IOException {
this.readAheadExecutorService =
MoreExecutors.listeningDecorator(readAheadExecutorService);
MoreExecutors.listeningDecorator(readAheadExecutorService);
this.store = store;
this.key = key;
this.statistics = statistics;
this.contentLength = contentLength;
downloadPartSize = conf.getLong(MULTIPART_DOWNLOAD_SIZE_KEY,
MULTIPART_DOWNLOAD_SIZE_DEFAULT);
this.downloadPartSize = Math.max(downloadPartSize, IO_FILE_BUFFER_SIZE_DEFAULT);
this.maxReadAheadPartNumber = maxReadAheadPartNumber;
this.expectNextPos = 0;
@ -77,6 +81,18 @@ public class AliyunOSSInputStream extends FSInputStream {
closed = false;
}
public AliyunOSSInputStream(Configuration conf,
ExecutorService readAheadExecutorService, int maxReadAheadPartNumber,
AliyunOSSFileSystemStore store, String key, Long contentLength,
Statistics statistics) throws IOException {
this(conf.getLong(MULTIPART_DOWNLOAD_SIZE_KEY, MULTIPART_DOWNLOAD_SIZE_DEFAULT),
readAheadExecutorService, maxReadAheadPartNumber, store, key, contentLength, statistics);
}
long getDownloadPartSize() {
return downloadPartSize;
}
/**
* Reopen the wrapped stream at give position, by seeking for
* data of a part length from object content stream.

View File

@ -33,8 +33,12 @@ import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.Random;
import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT;
import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_DOWNLOAD_SIZE_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -108,6 +112,31 @@ public class TestAliyunOSSInputStream {
IOUtils.closeStream(instream);
}
@Test
public void testConfiguration() throws IOException {
Path configurationFile = setPath("/test/configurationFile.txt");
long size = 5 * 1024 * 1024;
ContractTestUtils.generateTestFile(this.fs, configurationFile, size, 256, 255);
LOG.info("5MB file created: configurationFile.txt");
FSDataInputStream instream = this.fs.open(configurationFile);
assertTrue(instream.getWrappedStream() instanceof AliyunOSSInputStream);
AliyunOSSInputStream wrappedStream = (AliyunOSSInputStream) instream.getWrappedStream();
assertEquals(
fs.getConf().getLong(MULTIPART_DOWNLOAD_SIZE_KEY, MULTIPART_DOWNLOAD_SIZE_DEFAULT),
wrappedStream.getDownloadPartSize());
IOUtils.closeStream(instream);
instream = this.fs.open(configurationFile, 1024);
assertTrue(instream.getWrappedStream() instanceof AliyunOSSInputStream);
wrappedStream = (AliyunOSSInputStream) instream.getWrappedStream();
assertEquals(
1024,
wrappedStream.getDownloadPartSize());
IOUtils.closeStream(instream);
}
@Test
public void testSequentialAndRandomRead() throws Exception {
Path smallSeekFile = setPath("/test/smallSeekFile.txt");