HADOOP-15027. AliyunOSS: Support multi-thread pre-read to improve sequential read from Hadoop to Aliyun OSS performance. (Contributed by Jinhu Wu)

(cherry picked from commit 9195a6e302)
(cherry picked from commit 55142849db)
(cherry picked from commit 082a707bae)
This commit is contained in:
Sammi Chen 2018-01-17 15:55:59 +08:00
parent 8e7ce0eb4c
commit 896dc7c780
8 changed files with 407 additions and 50 deletions

View File

@ -15,4 +15,12 @@
limitations under the License.
-->
<FindBugsFilter>
<!-- Disable FindBugs warning and return the buffer to caller directly.
It is convenient and efficient because we do not need to copy the buffer
-->
<Match>
<Class name="org.apache.hadoop.fs.aliyun.oss.ReadBuffer" />
<Method name="getBuffer" />
<Bug pattern="EI_EXPOSE_REP" />
</Match>
</FindBugsFilter>

View File

@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Used by {@link AliyunOSSInputStream} as an task that submitted
* to the thread pool.
* Each AliyunOSSFileReaderTask reads one part of the file so that
* we can accelerate the sequential read.
*/
public class AliyunOSSFileReaderTask implements Runnable {
public static final Logger LOG =
LoggerFactory.getLogger(AliyunOSSFileReaderTask.class);
private String key;
private AliyunOSSFileSystemStore store;
private ReadBuffer readBuffer;
private static final int MAX_RETRIES = 3;
private RetryPolicy retryPolicy;
public AliyunOSSFileReaderTask(String key, AliyunOSSFileSystemStore store,
ReadBuffer readBuffer) {
this.key = key;
this.store = store;
this.readBuffer = readBuffer;
RetryPolicy defaultPolicy =
RetryPolicies.retryUpToMaximumCountWithFixedSleep(
MAX_RETRIES, 3, TimeUnit.SECONDS);
Map<Class<? extends Exception>, RetryPolicy> policies = new HashMap<>();
policies.put(IOException.class, defaultPolicy);
policies.put(IndexOutOfBoundsException.class,
RetryPolicies.TRY_ONCE_THEN_FAIL);
policies.put(NullPointerException.class,
RetryPolicies.TRY_ONCE_THEN_FAIL);
this.retryPolicy = RetryPolicies.retryByException(defaultPolicy, policies);
}
@Override
public void run() {
int retries = 0;
readBuffer.lock();
try {
while (true) {
try (InputStream in = store.retrieve(
key, readBuffer.getByteStart(), readBuffer.getByteEnd())) {
IOUtils.readFully(in, readBuffer.getBuffer(),
0, readBuffer.getBuffer().length);
readBuffer.setStatus(ReadBuffer.STATUS.SUCCESS);
break;
} catch (Exception e) {
LOG.warn("Exception thrown when retrieve key: "
+ this.key + ", exception: " + e);
try {
RetryPolicy.RetryAction rc = retryPolicy.shouldRetry(
e, retries++, 0, true);
if (rc.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
Thread.sleep(rc.delayMillis);
} else {
//should not retry
break;
}
} catch (Exception ex) {
//FAIL
LOG.warn("Exception thrown when call shouldRetry, exception " + ex);
break;
}
}
}
if (readBuffer.getStatus() != ReadBuffer.STATUS.SUCCESS) {
readBuffer.setStatus(ReadBuffer.STATUS.ERROR);
}
//notify main thread which wait for this buffer
readBuffer.signalAll();
} finally {
readBuffer.unlock();
}
}
}

View File

@ -24,7 +24,9 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
@ -41,12 +43,14 @@ import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.Progressable;
import com.aliyun.oss.model.OSSObjectSummary;
import com.aliyun.oss.model.ObjectListing;
import com.aliyun.oss.model.ObjectMetadata;
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -65,6 +69,9 @@ public class AliyunOSSFileSystem extends FileSystem {
private Path workingDir;
private AliyunOSSFileSystemStore store;
private int maxKeys;
private int maxReadAheadPartNumber;
private ListeningExecutorService boundedThreadPool;
private static final PathFilter DEFAULT_FILTER = new PathFilter() {
@Override
public boolean accept(Path file) {
@ -82,6 +89,7 @@ public class AliyunOSSFileSystem extends FileSystem {
public void close() throws IOException {
try {
store.close();
boundedThreadPool.shutdown();
} finally {
super.close();
}
@ -309,10 +317,24 @@ public class AliyunOSSFileSystem extends FileSystem {
store = new AliyunOSSFileSystemStore();
store.initialize(name, conf, statistics);
maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
int threadNum = AliyunOSSUtils.intPositiveOption(conf,
Constants.MULTIPART_DOWNLOAD_THREAD_NUMBER_KEY,
Constants.MULTIPART_DOWNLOAD_THREAD_NUMBER_DEFAULT);
int totalTasks = AliyunOSSUtils.intPositiveOption(conf,
Constants.MAX_TOTAL_TASKS_KEY, Constants.MAX_TOTAL_TASKS_DEFAULT);
maxReadAheadPartNumber = AliyunOSSUtils.intPositiveOption(conf,
Constants.MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_KEY,
Constants.MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_DEFAULT);
this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
threadNum, totalTasks, 60L, TimeUnit.SECONDS, "oss-read-shared");
setConf(conf);
}
/**
/**
* Turn a path (relative or otherwise) into an OSS key.
*
* @param path the path of the file.
@ -523,8 +545,11 @@ public class AliyunOSSFileSystem extends FileSystem {
" because it is a directory");
}
return new FSDataInputStream(new AliyunOSSInputStream(getConf(), store,
pathToKey(path), fileStatus.getLen(), statistics));
return new FSDataInputStream(new AliyunOSSInputStream(getConf(),
new SemaphoredDelegatingExecutor(
boundedThreadPool, maxReadAheadPartNumber, true),
maxReadAheadPartNumber, store, pathToKey(path), fileStatus.getLen(),
statistics));
}
@Override

View File

@ -22,8 +22,11 @@ import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -43,20 +46,33 @@ public class AliyunOSSInputStream extends FSInputStream {
private final String key;
private Statistics statistics;
private boolean closed;
private InputStream wrappedStream = null;
private long contentLength;
private long position;
private long partRemaining;
private byte[] buffer;
private int maxReadAheadPartNumber;
private long expectNextPos;
private long lastByteStart;
private ExecutorService readAheadExecutorService;
private Queue<ReadBuffer> readBufferQueue = new ArrayDeque<>();
public AliyunOSSInputStream(Configuration conf,
ExecutorService readAheadExecutorService, int maxReadAheadPartNumber,
AliyunOSSFileSystemStore store, String key, Long contentLength,
Statistics statistics) throws IOException {
this.readAheadExecutorService =
MoreExecutors.listeningDecorator(readAheadExecutorService);
this.store = store;
this.key = key;
this.statistics = statistics;
this.contentLength = contentLength;
downloadPartSize = conf.getLong(MULTIPART_DOWNLOAD_SIZE_KEY,
MULTIPART_DOWNLOAD_SIZE_DEFAULT);
this.maxReadAheadPartNumber = maxReadAheadPartNumber;
this.expectNextPos = 0;
this.lastByteStart = -1;
reopen(0);
closed = false;
}
@ -82,15 +98,81 @@ public class AliyunOSSInputStream extends FSInputStream {
partSize = downloadPartSize;
}
if (wrappedStream != null) {
if (this.buffer != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Aborting old stream to open at pos " + pos);
}
wrappedStream.close();
this.buffer = null;
}
wrappedStream = store.retrieve(key, pos, pos + partSize -1);
if (wrappedStream == null) {
boolean isRandomIO = true;
if (pos == this.expectNextPos) {
isRandomIO = false;
} else {
//new seek, remove cache buffers if its byteStart is not equal to pos
while (readBufferQueue.size() != 0) {
if (readBufferQueue.element().getByteStart() != pos) {
readBufferQueue.poll();
} else {
break;
}
}
}
this.expectNextPos = pos + partSize;
int currentSize = readBufferQueue.size();
if (currentSize == 0) {
//init lastByteStart to pos - partSize, used by for loop below
lastByteStart = pos - partSize;
} else {
ReadBuffer[] readBuffers = readBufferQueue.toArray(
new ReadBuffer[currentSize]);
lastByteStart = readBuffers[currentSize - 1].getByteStart();
}
int maxLen = this.maxReadAheadPartNumber - currentSize;
for (int i = 0; i < maxLen && i < (currentSize + 1) * 2; i++) {
if (lastByteStart + partSize * (i + 1) > contentLength) {
break;
}
long byteStart = lastByteStart + partSize * (i + 1);
long byteEnd = byteStart + partSize -1;
if (byteEnd >= contentLength) {
byteEnd = contentLength - 1;
}
ReadBuffer readBuffer = new ReadBuffer(byteStart, byteEnd);
if (readBuffer.getBuffer().length == 0) {
//EOF
readBuffer.setStatus(ReadBuffer.STATUS.SUCCESS);
} else {
this.readAheadExecutorService.execute(
new AliyunOSSFileReaderTask(key, store, readBuffer));
}
readBufferQueue.add(readBuffer);
if (isRandomIO) {
break;
}
}
ReadBuffer readBuffer = readBufferQueue.poll();
readBuffer.lock();
try {
readBuffer.await(ReadBuffer.STATUS.INIT);
if (readBuffer.getStatus() == ReadBuffer.STATUS.ERROR) {
this.buffer = null;
} else {
this.buffer = readBuffer.getBuffer();
}
} catch (InterruptedException e) {
LOG.warn("interrupted when wait a read buffer");
} finally {
readBuffer.unlock();
}
if (this.buffer == null) {
throw new IOException("Null IO stream");
}
position = pos;
@ -105,18 +187,10 @@ public class AliyunOSSInputStream extends FSInputStream {
reopen(position);
}
int tries = MAX_RETRIES;
boolean retry;
int byteRead = -1;
do {
retry = false;
try {
byteRead = wrappedStream.read();
} catch (Exception e) {
handleReadException(e, --tries);
retry = true;
}
} while (retry);
if (partRemaining != 0) {
byteRead = this.buffer[this.buffer.length - (int)partRemaining] & 0xFF;
}
if (byteRead >= 0) {
position++;
partRemaining--;
@ -161,21 +235,18 @@ public class AliyunOSSInputStream extends FSInputStream {
reopen(position);
}
int tries = MAX_RETRIES;
boolean retry;
int bytes = -1;
do {
retry = false;
try {
bytes = wrappedStream.read(buf, off + bytesRead, len - bytesRead);
} catch (Exception e) {
handleReadException(e, --tries);
retry = true;
int bytes = 0;
for (int i = this.buffer.length - (int)partRemaining;
i < this.buffer.length; i++) {
buf[off + bytesRead] = this.buffer[i];
bytes++;
bytesRead++;
if (off + bytesRead >= len) {
break;
}
} while (retry);
}
if (bytes > 0) {
bytesRead += bytes;
position += bytes;
partRemaining -= bytes;
} else if (partRemaining != 0) {
@ -202,9 +273,7 @@ public class AliyunOSSInputStream extends FSInputStream {
return;
}
closed = true;
if (wrappedStream != null) {
wrappedStream.close();
}
this.buffer = null;
}
@Override
@ -225,7 +294,6 @@ public class AliyunOSSInputStream extends FSInputStream {
return;
} else if (pos > position && pos < position + partRemaining) {
long len = pos - position;
AliyunOSSUtils.skipFully(wrappedStream, len);
position = pos;
partRemaining -= len;
} else {
@ -245,18 +313,7 @@ public class AliyunOSSInputStream extends FSInputStream {
return false;
}
private void handleReadException(Exception e, int tries) throws IOException{
if (tries == 0) {
throw new IOException(e);
}
LOG.warn("Some exceptions occurred in oss connection, try to reopen oss" +
" connection at position '" + position + "', " + e.getMessage());
try {
Thread.sleep(100);
} catch (InterruptedException e2) {
LOG.warn(e2.getMessage());
}
reopen(position);
public long getExpectNextPos() {
return this.expectNextPos;
}
}

View File

@ -40,6 +40,18 @@ final public class AliyunOSSUtils {
private AliyunOSSUtils() {
}
public static int intPositiveOption(
Configuration conf, String key, int defVal) {
int v = conf.getInt(key, defVal);
if (v <= 0) {
LOG.warn(key + " is configured to " + v
+ ", will use default value: " + defVal);
v = defVal;
}
return v;
}
/**
* Used to get password from configuration.
*

View File

@ -97,7 +97,18 @@ public final class Constants {
public static final String MULTIPART_DOWNLOAD_SIZE_KEY =
"fs.oss.multipart.download.size";
public static final long MULTIPART_DOWNLOAD_SIZE_DEFAULT = 100 * 1024;
public static final long MULTIPART_DOWNLOAD_SIZE_DEFAULT = 512 * 1024;
public static final String MULTIPART_DOWNLOAD_THREAD_NUMBER_KEY =
"fs.oss.multipart.download.threads";
public static final int MULTIPART_DOWNLOAD_THREAD_NUMBER_DEFAULT = 10;
public static final String MAX_TOTAL_TASKS_KEY = "fs.oss.max.total.tasks";
public static final int MAX_TOTAL_TASKS_DEFAULT = 128;
public static final String MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_KEY =
"fs.oss.multipart.download.ahead.part.max.number";
public static final int MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_DEFAULT = 4;
// Comma separated list of directories
public static final String BUFFER_DIR_KEY = "fs.oss.buffer.dir";

View File

@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* This class is used by {@link AliyunOSSInputStream}
* and {@link AliyunOSSFileReaderTask} to buffer data that read from oss.
*/
public class ReadBuffer {
enum STATUS {
INIT, SUCCESS, ERROR
}
private final ReentrantLock lock = new ReentrantLock();
private Condition readyCondition = lock.newCondition();
private byte[] buffer;
private STATUS status;
private long byteStart;
private long byteEnd;
public ReadBuffer(long byteStart, long byteEnd) {
this.buffer = new byte[(int)(byteEnd - byteStart) + 1];
this.status = STATUS.INIT;
this.byteStart = byteStart;
this.byteEnd = byteEnd;
}
public void lock() {
lock.lock();
}
public void unlock() {
lock.unlock();
}
public void await(STATUS waitStatus) throws InterruptedException {
while (this.status == waitStatus) {
readyCondition.await();
}
}
public void signalAll() {
readyCondition.signalAll();
}
public byte[] getBuffer() {
return buffer;
}
public STATUS getStatus() {
return status;
}
public void setStatus(STATUS status) {
this.status = status;
}
public long getByteStart() {
return byteStart;
}
public long getByteEnd() {
return byteEnd;
}
}

View File

@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
import java.util.Random;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
@ -107,6 +108,54 @@ public class TestAliyunOSSInputStream {
IOUtils.closeStream(instream);
}
@Test
public void testSequentialAndRandomRead() throws Exception {
Path smallSeekFile = setPath("/test/smallSeekFile.txt");
long size = 5 * 1024 * 1024;
ContractTestUtils.generateTestFile(this.fs, smallSeekFile, size, 256, 255);
LOG.info("5MB file created: smallSeekFile.txt");
FSDataInputStream fsDataInputStream = this.fs.open(smallSeekFile);
AliyunOSSInputStream in =
(AliyunOSSInputStream)fsDataInputStream.getWrappedStream();
assertTrue("expected position at:" + 0 + ", but got:"
+ fsDataInputStream.getPos(), fsDataInputStream.getPos() == 0);
assertTrue("expected position at:"
+ Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT + ", but got:"
+ in.getExpectNextPos(),
in.getExpectNextPos() == Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT);
fsDataInputStream.seek(4 * 1024 * 1024);
assertTrue("expected position at:" + 4 * 1024 * 1024
+ Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT + ", but got:"
+ in.getExpectNextPos(),
in.getExpectNextPos() == 4 * 1024 * 1024
+ Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT);
IOUtils.closeStream(fsDataInputStream);
}
@Test
public void testOSSFileReaderTask() throws Exception {
Path smallSeekFile = setPath("/test/smallSeekFileOSSFileReader.txt");
long size = 5 * 1024 * 1024;
ContractTestUtils.generateTestFile(this.fs, smallSeekFile, size, 256, 255);
LOG.info("5MB file created: smallSeekFileOSSFileReader.txt");
ReadBuffer readBuffer = new ReadBuffer(12, 24);
AliyunOSSFileReaderTask task = new AliyunOSSFileReaderTask("1",
((AliyunOSSFileSystem)this.fs).getStore(), readBuffer);
//NullPointerException, fail
task.run();
assertEquals(readBuffer.getStatus(), ReadBuffer.STATUS.ERROR);
//OK
task = new AliyunOSSFileReaderTask(
"test/test/smallSeekFileOSSFileReader.txt",
((AliyunOSSFileSystem)this.fs).getStore(), readBuffer);
task.run();
assertEquals(readBuffer.getStatus(), ReadBuffer.STATUS.SUCCESS);
}
@Test
public void testReadFile() throws Exception {
final int bufLen = 256;