HADOOP-15027. AliyunOSS: Support multi-thread pre-read to improve sequential read from Hadoop to Aliyun OSS performance. (Contributed by Jinhu Wu)

This commit is contained in:
Sammi Chen 2018-01-17 15:55:59 +08:00
parent 41049ba5d1
commit 9195a6e302
8 changed files with 407 additions and 50 deletions

View File

@ -15,4 +15,12 @@
limitations under the License.
-->
<FindBugsFilter>
<!-- Disable FindBugs warning and return the buffer to caller directly.
It is convenient and efficient because we do not need to copy the buffer
-->
<Match>
<Class name="org.apache.hadoop.fs.aliyun.oss.ReadBuffer" />
<Method name="getBuffer" />
<Bug pattern="EI_EXPOSE_REP" />
</Match>
</FindBugsFilter>

View File

@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Used by {@link AliyunOSSInputStream} as an task that submitted
* to the thread pool.
* Each AliyunOSSFileReaderTask reads one part of the file so that
* we can accelerate the sequential read.
*/
public class AliyunOSSFileReaderTask implements Runnable {
public static final Logger LOG =
LoggerFactory.getLogger(AliyunOSSFileReaderTask.class);
private String key;
private AliyunOSSFileSystemStore store;
private ReadBuffer readBuffer;
private static final int MAX_RETRIES = 3;
private RetryPolicy retryPolicy;
public AliyunOSSFileReaderTask(String key, AliyunOSSFileSystemStore store,
ReadBuffer readBuffer) {
this.key = key;
this.store = store;
this.readBuffer = readBuffer;
RetryPolicy defaultPolicy =
RetryPolicies.retryUpToMaximumCountWithFixedSleep(
MAX_RETRIES, 3, TimeUnit.SECONDS);
Map<Class<? extends Exception>, RetryPolicy> policies = new HashMap<>();
policies.put(IOException.class, defaultPolicy);
policies.put(IndexOutOfBoundsException.class,
RetryPolicies.TRY_ONCE_THEN_FAIL);
policies.put(NullPointerException.class,
RetryPolicies.TRY_ONCE_THEN_FAIL);
this.retryPolicy = RetryPolicies.retryByException(defaultPolicy, policies);
}
@Override
public void run() {
int retries = 0;
readBuffer.lock();
try {
while (true) {
try (InputStream in = store.retrieve(
key, readBuffer.getByteStart(), readBuffer.getByteEnd())) {
IOUtils.readFully(in, readBuffer.getBuffer(),
0, readBuffer.getBuffer().length);
readBuffer.setStatus(ReadBuffer.STATUS.SUCCESS);
break;
} catch (Exception e) {
LOG.warn("Exception thrown when retrieve key: "
+ this.key + ", exception: " + e);
try {
RetryPolicy.RetryAction rc = retryPolicy.shouldRetry(
e, retries++, 0, true);
if (rc.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
Thread.sleep(rc.delayMillis);
} else {
//should not retry
break;
}
} catch (Exception ex) {
//FAIL
LOG.warn("Exception thrown when call shouldRetry, exception " + ex);
break;
}
}
}
if (readBuffer.getStatus() != ReadBuffer.STATUS.SUCCESS) {
readBuffer.setStatus(ReadBuffer.STATUS.ERROR);
}
//notify main thread which wait for this buffer
readBuffer.signalAll();
} finally {
readBuffer.unlock();
}
}
}

View File

@ -24,7 +24,9 @@
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
@ -41,12 +43,14 @@
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.Progressable;
import com.aliyun.oss.model.OSSObjectSummary;
import com.aliyun.oss.model.ObjectListing;
import com.aliyun.oss.model.ObjectMetadata;
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -65,6 +69,9 @@ public class AliyunOSSFileSystem extends FileSystem {
private Path workingDir;
private AliyunOSSFileSystemStore store;
private int maxKeys;
private int maxReadAheadPartNumber;
private ListeningExecutorService boundedThreadPool;
private static final PathFilter DEFAULT_FILTER = new PathFilter() {
@Override
public boolean accept(Path file) {
@ -82,6 +89,7 @@ public FSDataOutputStream append(Path path, int bufferSize,
public void close() throws IOException {
try {
store.close();
boundedThreadPool.shutdown();
} finally {
super.close();
}
@ -309,10 +317,24 @@ public void initialize(URI name, Configuration conf) throws IOException {
store = new AliyunOSSFileSystemStore();
store.initialize(name, conf, statistics);
maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
int threadNum = AliyunOSSUtils.intPositiveOption(conf,
Constants.MULTIPART_DOWNLOAD_THREAD_NUMBER_KEY,
Constants.MULTIPART_DOWNLOAD_THREAD_NUMBER_DEFAULT);
int totalTasks = AliyunOSSUtils.intPositiveOption(conf,
Constants.MAX_TOTAL_TASKS_KEY, Constants.MAX_TOTAL_TASKS_DEFAULT);
maxReadAheadPartNumber = AliyunOSSUtils.intPositiveOption(conf,
Constants.MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_KEY,
Constants.MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_DEFAULT);
this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
threadNum, totalTasks, 60L, TimeUnit.SECONDS, "oss-read-shared");
setConf(conf);
}
/**
/**
* Turn a path (relative or otherwise) into an OSS key.
*
* @param path the path of the file.
@ -523,8 +545,11 @@ public FSDataInputStream open(Path path, int bufferSize) throws IOException {
" because it is a directory");
}
return new FSDataInputStream(new AliyunOSSInputStream(getConf(), store,
pathToKey(path), fileStatus.getLen(), statistics));
return new FSDataInputStream(new AliyunOSSInputStream(getConf(),
new SemaphoredDelegatingExecutor(
boundedThreadPool, maxReadAheadPartNumber, true),
maxReadAheadPartNumber, store, pathToKey(path), fileStatus.getLen(),
statistics));
}
@Override

View File

@ -20,8 +20,11 @@
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -43,20 +46,33 @@ public class AliyunOSSInputStream extends FSInputStream {
private final String key;
private Statistics statistics;
private boolean closed;
private InputStream wrappedStream = null;
private long contentLength;
private long position;
private long partRemaining;
private byte[] buffer;
private int maxReadAheadPartNumber;
private long expectNextPos;
private long lastByteStart;
private ExecutorService readAheadExecutorService;
private Queue<ReadBuffer> readBufferQueue = new ArrayDeque<>();
public AliyunOSSInputStream(Configuration conf,
ExecutorService readAheadExecutorService, int maxReadAheadPartNumber,
AliyunOSSFileSystemStore store, String key, Long contentLength,
Statistics statistics) throws IOException {
this.readAheadExecutorService =
MoreExecutors.listeningDecorator(readAheadExecutorService);
this.store = store;
this.key = key;
this.statistics = statistics;
this.contentLength = contentLength;
downloadPartSize = conf.getLong(MULTIPART_DOWNLOAD_SIZE_KEY,
MULTIPART_DOWNLOAD_SIZE_DEFAULT);
this.maxReadAheadPartNumber = maxReadAheadPartNumber;
this.expectNextPos = 0;
this.lastByteStart = -1;
reopen(0);
closed = false;
}
@ -82,15 +98,81 @@ private synchronized void reopen(long pos) throws IOException {
partSize = downloadPartSize;
}
if (wrappedStream != null) {
if (this.buffer != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Aborting old stream to open at pos " + pos);
}
wrappedStream.close();
this.buffer = null;
}
wrappedStream = store.retrieve(key, pos, pos + partSize -1);
if (wrappedStream == null) {
boolean isRandomIO = true;
if (pos == this.expectNextPos) {
isRandomIO = false;
} else {
//new seek, remove cache buffers if its byteStart is not equal to pos
while (readBufferQueue.size() != 0) {
if (readBufferQueue.element().getByteStart() != pos) {
readBufferQueue.poll();
} else {
break;
}
}
}
this.expectNextPos = pos + partSize;
int currentSize = readBufferQueue.size();
if (currentSize == 0) {
//init lastByteStart to pos - partSize, used by for loop below
lastByteStart = pos - partSize;
} else {
ReadBuffer[] readBuffers = readBufferQueue.toArray(
new ReadBuffer[currentSize]);
lastByteStart = readBuffers[currentSize - 1].getByteStart();
}
int maxLen = this.maxReadAheadPartNumber - currentSize;
for (int i = 0; i < maxLen && i < (currentSize + 1) * 2; i++) {
if (lastByteStart + partSize * (i + 1) > contentLength) {
break;
}
long byteStart = lastByteStart + partSize * (i + 1);
long byteEnd = byteStart + partSize -1;
if (byteEnd >= contentLength) {
byteEnd = contentLength - 1;
}
ReadBuffer readBuffer = new ReadBuffer(byteStart, byteEnd);
if (readBuffer.getBuffer().length == 0) {
//EOF
readBuffer.setStatus(ReadBuffer.STATUS.SUCCESS);
} else {
this.readAheadExecutorService.execute(
new AliyunOSSFileReaderTask(key, store, readBuffer));
}
readBufferQueue.add(readBuffer);
if (isRandomIO) {
break;
}
}
ReadBuffer readBuffer = readBufferQueue.poll();
readBuffer.lock();
try {
readBuffer.await(ReadBuffer.STATUS.INIT);
if (readBuffer.getStatus() == ReadBuffer.STATUS.ERROR) {
this.buffer = null;
} else {
this.buffer = readBuffer.getBuffer();
}
} catch (InterruptedException e) {
LOG.warn("interrupted when wait a read buffer");
} finally {
readBuffer.unlock();
}
if (this.buffer == null) {
throw new IOException("Null IO stream");
}
position = pos;
@ -105,18 +187,10 @@ public synchronized int read() throws IOException {
reopen(position);
}
int tries = MAX_RETRIES;
boolean retry;
int byteRead = -1;
do {
retry = false;
try {
byteRead = wrappedStream.read();
} catch (Exception e) {
handleReadException(e, --tries);
retry = true;
}
} while (retry);
if (partRemaining != 0) {
byteRead = this.buffer[this.buffer.length - (int)partRemaining] & 0xFF;
}
if (byteRead >= 0) {
position++;
partRemaining--;
@ -161,21 +235,18 @@ public synchronized int read(byte[] buf, int off, int len)
reopen(position);
}
int tries = MAX_RETRIES;
boolean retry;
int bytes = -1;
do {
retry = false;
try {
bytes = wrappedStream.read(buf, off + bytesRead, len - bytesRead);
} catch (Exception e) {
handleReadException(e, --tries);
retry = true;
int bytes = 0;
for (int i = this.buffer.length - (int)partRemaining;
i < this.buffer.length; i++) {
buf[off + bytesRead] = this.buffer[i];
bytes++;
bytesRead++;
if (off + bytesRead >= len) {
break;
}
} while (retry);
}
if (bytes > 0) {
bytesRead += bytes;
position += bytes;
partRemaining -= bytes;
} else if (partRemaining != 0) {
@ -202,9 +273,7 @@ public synchronized void close() throws IOException {
return;
}
closed = true;
if (wrappedStream != null) {
wrappedStream.close();
}
this.buffer = null;
}
@Override
@ -225,7 +294,6 @@ public synchronized void seek(long pos) throws IOException {
return;
} else if (pos > position && pos < position + partRemaining) {
long len = pos - position;
AliyunOSSUtils.skipFully(wrappedStream, len);
position = pos;
partRemaining -= len;
} else {
@ -245,18 +313,7 @@ public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
private void handleReadException(Exception e, int tries) throws IOException{
if (tries == 0) {
throw new IOException(e);
}
LOG.warn("Some exceptions occurred in oss connection, try to reopen oss" +
" connection at position '" + position + "', " + e.getMessage());
try {
Thread.sleep(100);
} catch (InterruptedException e2) {
LOG.warn(e2.getMessage());
}
reopen(position);
public long getExpectNextPos() {
return this.expectNextPos;
}
}

View File

@ -40,6 +40,18 @@ final public class AliyunOSSUtils {
private AliyunOSSUtils() {
}
public static int intPositiveOption(
Configuration conf, String key, int defVal) {
int v = conf.getInt(key, defVal);
if (v <= 0) {
LOG.warn(key + " is configured to " + v
+ ", will use default value: " + defVal);
v = defVal;
}
return v;
}
/**
* Used to get password from configuration.
*

View File

@ -97,7 +97,18 @@ private Constants() {
public static final String MULTIPART_DOWNLOAD_SIZE_KEY =
"fs.oss.multipart.download.size";
public static final long MULTIPART_DOWNLOAD_SIZE_DEFAULT = 100 * 1024;
public static final long MULTIPART_DOWNLOAD_SIZE_DEFAULT = 512 * 1024;
public static final String MULTIPART_DOWNLOAD_THREAD_NUMBER_KEY =
"fs.oss.multipart.download.threads";
public static final int MULTIPART_DOWNLOAD_THREAD_NUMBER_DEFAULT = 10;
public static final String MAX_TOTAL_TASKS_KEY = "fs.oss.max.total.tasks";
public static final int MAX_TOTAL_TASKS_DEFAULT = 128;
public static final String MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_KEY =
"fs.oss.multipart.download.ahead.part.max.number";
public static final int MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_DEFAULT = 4;
// Comma separated list of directories
public static final String BUFFER_DIR_KEY = "fs.oss.buffer.dir";

View File

@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* This class is used by {@link AliyunOSSInputStream}
* and {@link AliyunOSSFileReaderTask} to buffer data that read from oss.
*/
public class ReadBuffer {
enum STATUS {
INIT, SUCCESS, ERROR
}
private final ReentrantLock lock = new ReentrantLock();
private Condition readyCondition = lock.newCondition();
private byte[] buffer;
private STATUS status;
private long byteStart;
private long byteEnd;
public ReadBuffer(long byteStart, long byteEnd) {
this.buffer = new byte[(int)(byteEnd - byteStart) + 1];
this.status = STATUS.INIT;
this.byteStart = byteStart;
this.byteEnd = byteEnd;
}
public void lock() {
lock.lock();
}
public void unlock() {
lock.unlock();
}
public void await(STATUS waitStatus) throws InterruptedException {
while (this.status == waitStatus) {
readyCondition.await();
}
}
public void signalAll() {
readyCondition.signalAll();
}
public byte[] getBuffer() {
return buffer;
}
public STATUS getStatus() {
return status;
}
public void setStatus(STATUS status) {
this.status = status;
}
public long getByteStart() {
return byteStart;
}
public long getByteEnd() {
return byteEnd;
}
}

View File

@ -35,6 +35,7 @@
import java.util.Random;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
@ -107,6 +108,54 @@ public void testSeekFile() throws Exception {
IOUtils.closeStream(instream);
}
@Test
public void testSequentialAndRandomRead() throws Exception {
Path smallSeekFile = setPath("/test/smallSeekFile.txt");
long size = 5 * 1024 * 1024;
ContractTestUtils.generateTestFile(this.fs, smallSeekFile, size, 256, 255);
LOG.info("5MB file created: smallSeekFile.txt");
FSDataInputStream fsDataInputStream = this.fs.open(smallSeekFile);
AliyunOSSInputStream in =
(AliyunOSSInputStream)fsDataInputStream.getWrappedStream();
assertTrue("expected position at:" + 0 + ", but got:"
+ fsDataInputStream.getPos(), fsDataInputStream.getPos() == 0);
assertTrue("expected position at:"
+ Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT + ", but got:"
+ in.getExpectNextPos(),
in.getExpectNextPos() == Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT);
fsDataInputStream.seek(4 * 1024 * 1024);
assertTrue("expected position at:" + 4 * 1024 * 1024
+ Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT + ", but got:"
+ in.getExpectNextPos(),
in.getExpectNextPos() == 4 * 1024 * 1024
+ Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT);
IOUtils.closeStream(fsDataInputStream);
}
@Test
public void testOSSFileReaderTask() throws Exception {
Path smallSeekFile = setPath("/test/smallSeekFileOSSFileReader.txt");
long size = 5 * 1024 * 1024;
ContractTestUtils.generateTestFile(this.fs, smallSeekFile, size, 256, 255);
LOG.info("5MB file created: smallSeekFileOSSFileReader.txt");
ReadBuffer readBuffer = new ReadBuffer(12, 24);
AliyunOSSFileReaderTask task = new AliyunOSSFileReaderTask("1",
((AliyunOSSFileSystem)this.fs).getStore(), readBuffer);
//NullPointerException, fail
task.run();
assertEquals(readBuffer.getStatus(), ReadBuffer.STATUS.ERROR);
//OK
task = new AliyunOSSFileReaderTask(
"test/test/smallSeekFileOSSFileReader.txt",
((AliyunOSSFileSystem)this.fs).getStore(), readBuffer);
task.run();
assertEquals(readBuffer.getStatus(), ReadBuffer.STATUS.SUCCESS);
}
@Test
public void testReadFile() throws Exception {
final int bufLen = 256;