HADOOP-15027. AliyunOSS: Support multi-thread pre-read to improve sequential read from Hadoop to Aliyun OSS performance. (Contributed by Jinhu Wu)
(cherry picked from commit9195a6e302
) (cherry picked from commit91184299c5
) (cherry picked from commit2816bd1f43
)
This commit is contained in:
parent
a15df67f65
commit
e30abdaaae
|
@ -15,4 +15,12 @@
|
||||||
limitations under the License.
|
limitations under the License.
|
||||||
-->
|
-->
|
||||||
<FindBugsFilter>
|
<FindBugsFilter>
|
||||||
|
<!-- Disable FindBugs warning and return the buffer to caller directly.
|
||||||
|
It is convenient and efficient because we do not need to copy the buffer
|
||||||
|
-->
|
||||||
|
<Match>
|
||||||
|
<Class name="org.apache.hadoop.fs.aliyun.oss.ReadBuffer" />
|
||||||
|
<Method name="getBuffer" />
|
||||||
|
<Bug pattern="EI_EXPOSE_REP" />
|
||||||
|
</Match>
|
||||||
</FindBugsFilter>
|
</FindBugsFilter>
|
||||||
|
|
|
@ -0,0 +1,109 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p/>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p/>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.aliyun.oss;
|
||||||
|
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.io.retry.RetryPolicies;
|
||||||
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used by {@link AliyunOSSInputStream} as an task that submitted
|
||||||
|
* to the thread pool.
|
||||||
|
* Each AliyunOSSFileReaderTask reads one part of the file so that
|
||||||
|
* we can accelerate the sequential read.
|
||||||
|
*/
|
||||||
|
public class AliyunOSSFileReaderTask implements Runnable {
|
||||||
|
public static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(AliyunOSSFileReaderTask.class);
|
||||||
|
|
||||||
|
private String key;
|
||||||
|
private AliyunOSSFileSystemStore store;
|
||||||
|
private ReadBuffer readBuffer;
|
||||||
|
private static final int MAX_RETRIES = 3;
|
||||||
|
private RetryPolicy retryPolicy;
|
||||||
|
|
||||||
|
public AliyunOSSFileReaderTask(String key, AliyunOSSFileSystemStore store,
|
||||||
|
ReadBuffer readBuffer) {
|
||||||
|
this.key = key;
|
||||||
|
this.store = store;
|
||||||
|
this.readBuffer = readBuffer;
|
||||||
|
RetryPolicy defaultPolicy =
|
||||||
|
RetryPolicies.retryUpToMaximumCountWithFixedSleep(
|
||||||
|
MAX_RETRIES, 3, TimeUnit.SECONDS);
|
||||||
|
Map<Class<? extends Exception>, RetryPolicy> policies = new HashMap<>();
|
||||||
|
policies.put(IOException.class, defaultPolicy);
|
||||||
|
policies.put(IndexOutOfBoundsException.class,
|
||||||
|
RetryPolicies.TRY_ONCE_THEN_FAIL);
|
||||||
|
policies.put(NullPointerException.class,
|
||||||
|
RetryPolicies.TRY_ONCE_THEN_FAIL);
|
||||||
|
|
||||||
|
this.retryPolicy = RetryPolicies.retryByException(defaultPolicy, policies);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
int retries = 0;
|
||||||
|
readBuffer.lock();
|
||||||
|
try {
|
||||||
|
while (true) {
|
||||||
|
try (InputStream in = store.retrieve(
|
||||||
|
key, readBuffer.getByteStart(), readBuffer.getByteEnd())) {
|
||||||
|
IOUtils.readFully(in, readBuffer.getBuffer(),
|
||||||
|
0, readBuffer.getBuffer().length);
|
||||||
|
readBuffer.setStatus(ReadBuffer.STATUS.SUCCESS);
|
||||||
|
break;
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("Exception thrown when retrieve key: "
|
||||||
|
+ this.key + ", exception: " + e);
|
||||||
|
try {
|
||||||
|
RetryPolicy.RetryAction rc = retryPolicy.shouldRetry(
|
||||||
|
e, retries++, 0, true);
|
||||||
|
if (rc.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
|
||||||
|
Thread.sleep(rc.delayMillis);
|
||||||
|
} else {
|
||||||
|
//should not retry
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} catch (Exception ex) {
|
||||||
|
//FAIL
|
||||||
|
LOG.warn("Exception thrown when call shouldRetry, exception " + ex);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (readBuffer.getStatus() != ReadBuffer.STATUS.SUCCESS) {
|
||||||
|
readBuffer.setStatus(ReadBuffer.STATUS.ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
//notify main thread which wait for this buffer
|
||||||
|
readBuffer.signalAll();
|
||||||
|
} finally {
|
||||||
|
readBuffer.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -24,7 +24,9 @@ import java.net.URI;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||||
import org.apache.commons.collections.CollectionUtils;
|
import org.apache.commons.collections.CollectionUtils;
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -41,12 +43,14 @@ import org.apache.hadoop.fs.PathFilter;
|
||||||
import org.apache.hadoop.fs.PathIOException;
|
import org.apache.hadoop.fs.PathIOException;
|
||||||
import org.apache.hadoop.fs.RemoteIterator;
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
|
|
||||||
import com.aliyun.oss.model.OSSObjectSummary;
|
import com.aliyun.oss.model.OSSObjectSummary;
|
||||||
import com.aliyun.oss.model.ObjectListing;
|
import com.aliyun.oss.model.ObjectListing;
|
||||||
import com.aliyun.oss.model.ObjectMetadata;
|
import com.aliyun.oss.model.ObjectMetadata;
|
||||||
|
|
||||||
|
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -65,6 +69,9 @@ public class AliyunOSSFileSystem extends FileSystem {
|
||||||
private Path workingDir;
|
private Path workingDir;
|
||||||
private AliyunOSSFileSystemStore store;
|
private AliyunOSSFileSystemStore store;
|
||||||
private int maxKeys;
|
private int maxKeys;
|
||||||
|
private int maxReadAheadPartNumber;
|
||||||
|
private ListeningExecutorService boundedThreadPool;
|
||||||
|
|
||||||
private static final PathFilter DEFAULT_FILTER = new PathFilter() {
|
private static final PathFilter DEFAULT_FILTER = new PathFilter() {
|
||||||
@Override
|
@Override
|
||||||
public boolean accept(Path file) {
|
public boolean accept(Path file) {
|
||||||
|
@ -82,6 +89,7 @@ public class AliyunOSSFileSystem extends FileSystem {
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
try {
|
try {
|
||||||
store.close();
|
store.close();
|
||||||
|
boundedThreadPool.shutdown();
|
||||||
} finally {
|
} finally {
|
||||||
super.close();
|
super.close();
|
||||||
}
|
}
|
||||||
|
@ -309,10 +317,24 @@ public class AliyunOSSFileSystem extends FileSystem {
|
||||||
store = new AliyunOSSFileSystemStore();
|
store = new AliyunOSSFileSystemStore();
|
||||||
store.initialize(name, conf, statistics);
|
store.initialize(name, conf, statistics);
|
||||||
maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
|
maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
|
||||||
|
|
||||||
|
int threadNum = AliyunOSSUtils.intPositiveOption(conf,
|
||||||
|
Constants.MULTIPART_DOWNLOAD_THREAD_NUMBER_KEY,
|
||||||
|
Constants.MULTIPART_DOWNLOAD_THREAD_NUMBER_DEFAULT);
|
||||||
|
|
||||||
|
int totalTasks = AliyunOSSUtils.intPositiveOption(conf,
|
||||||
|
Constants.MAX_TOTAL_TASKS_KEY, Constants.MAX_TOTAL_TASKS_DEFAULT);
|
||||||
|
|
||||||
|
maxReadAheadPartNumber = AliyunOSSUtils.intPositiveOption(conf,
|
||||||
|
Constants.MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_KEY,
|
||||||
|
Constants.MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_DEFAULT);
|
||||||
|
|
||||||
|
this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
|
||||||
|
threadNum, totalTasks, 60L, TimeUnit.SECONDS, "oss-read-shared");
|
||||||
setConf(conf);
|
setConf(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Turn a path (relative or otherwise) into an OSS key.
|
* Turn a path (relative or otherwise) into an OSS key.
|
||||||
*
|
*
|
||||||
* @param path the path of the file.
|
* @param path the path of the file.
|
||||||
|
@ -523,8 +545,11 @@ public class AliyunOSSFileSystem extends FileSystem {
|
||||||
" because it is a directory");
|
" because it is a directory");
|
||||||
}
|
}
|
||||||
|
|
||||||
return new FSDataInputStream(new AliyunOSSInputStream(getConf(), store,
|
return new FSDataInputStream(new AliyunOSSInputStream(getConf(),
|
||||||
pathToKey(path), fileStatus.getLen(), statistics));
|
new SemaphoredDelegatingExecutor(
|
||||||
|
boundedThreadPool, maxReadAheadPartNumber, true),
|
||||||
|
maxReadAheadPartNumber, store, pathToKey(path), fileStatus.getLen(),
|
||||||
|
statistics));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -22,8 +22,11 @@ import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
|
||||||
|
|
||||||
import java.io.EOFException;
|
import java.io.EOFException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.util.ArrayDeque;
|
||||||
|
import java.util.Queue;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -43,20 +46,33 @@ public class AliyunOSSInputStream extends FSInputStream {
|
||||||
private final String key;
|
private final String key;
|
||||||
private Statistics statistics;
|
private Statistics statistics;
|
||||||
private boolean closed;
|
private boolean closed;
|
||||||
private InputStream wrappedStream = null;
|
|
||||||
private long contentLength;
|
private long contentLength;
|
||||||
private long position;
|
private long position;
|
||||||
private long partRemaining;
|
private long partRemaining;
|
||||||
|
private byte[] buffer;
|
||||||
|
private int maxReadAheadPartNumber;
|
||||||
|
private long expectNextPos;
|
||||||
|
private long lastByteStart;
|
||||||
|
|
||||||
|
private ExecutorService readAheadExecutorService;
|
||||||
|
private Queue<ReadBuffer> readBufferQueue = new ArrayDeque<>();
|
||||||
|
|
||||||
public AliyunOSSInputStream(Configuration conf,
|
public AliyunOSSInputStream(Configuration conf,
|
||||||
|
ExecutorService readAheadExecutorService, int maxReadAheadPartNumber,
|
||||||
AliyunOSSFileSystemStore store, String key, Long contentLength,
|
AliyunOSSFileSystemStore store, String key, Long contentLength,
|
||||||
Statistics statistics) throws IOException {
|
Statistics statistics) throws IOException {
|
||||||
|
this.readAheadExecutorService =
|
||||||
|
MoreExecutors.listeningDecorator(readAheadExecutorService);
|
||||||
this.store = store;
|
this.store = store;
|
||||||
this.key = key;
|
this.key = key;
|
||||||
this.statistics = statistics;
|
this.statistics = statistics;
|
||||||
this.contentLength = contentLength;
|
this.contentLength = contentLength;
|
||||||
downloadPartSize = conf.getLong(MULTIPART_DOWNLOAD_SIZE_KEY,
|
downloadPartSize = conf.getLong(MULTIPART_DOWNLOAD_SIZE_KEY,
|
||||||
MULTIPART_DOWNLOAD_SIZE_DEFAULT);
|
MULTIPART_DOWNLOAD_SIZE_DEFAULT);
|
||||||
|
this.maxReadAheadPartNumber = maxReadAheadPartNumber;
|
||||||
|
|
||||||
|
this.expectNextPos = 0;
|
||||||
|
this.lastByteStart = -1;
|
||||||
reopen(0);
|
reopen(0);
|
||||||
closed = false;
|
closed = false;
|
||||||
}
|
}
|
||||||
|
@ -82,15 +98,81 @@ public class AliyunOSSInputStream extends FSInputStream {
|
||||||
partSize = downloadPartSize;
|
partSize = downloadPartSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (wrappedStream != null) {
|
if (this.buffer != null) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Aborting old stream to open at pos " + pos);
|
LOG.debug("Aborting old stream to open at pos " + pos);
|
||||||
}
|
}
|
||||||
wrappedStream.close();
|
this.buffer = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
wrappedStream = store.retrieve(key, pos, pos + partSize -1);
|
boolean isRandomIO = true;
|
||||||
if (wrappedStream == null) {
|
if (pos == this.expectNextPos) {
|
||||||
|
isRandomIO = false;
|
||||||
|
} else {
|
||||||
|
//new seek, remove cache buffers if its byteStart is not equal to pos
|
||||||
|
while (readBufferQueue.size() != 0) {
|
||||||
|
if (readBufferQueue.element().getByteStart() != pos) {
|
||||||
|
readBufferQueue.poll();
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.expectNextPos = pos + partSize;
|
||||||
|
|
||||||
|
int currentSize = readBufferQueue.size();
|
||||||
|
if (currentSize == 0) {
|
||||||
|
//init lastByteStart to pos - partSize, used by for loop below
|
||||||
|
lastByteStart = pos - partSize;
|
||||||
|
} else {
|
||||||
|
ReadBuffer[] readBuffers = readBufferQueue.toArray(
|
||||||
|
new ReadBuffer[currentSize]);
|
||||||
|
lastByteStart = readBuffers[currentSize - 1].getByteStart();
|
||||||
|
}
|
||||||
|
|
||||||
|
int maxLen = this.maxReadAheadPartNumber - currentSize;
|
||||||
|
for (int i = 0; i < maxLen && i < (currentSize + 1) * 2; i++) {
|
||||||
|
if (lastByteStart + partSize * (i + 1) > contentLength) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
long byteStart = lastByteStart + partSize * (i + 1);
|
||||||
|
long byteEnd = byteStart + partSize -1;
|
||||||
|
if (byteEnd >= contentLength) {
|
||||||
|
byteEnd = contentLength - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
ReadBuffer readBuffer = new ReadBuffer(byteStart, byteEnd);
|
||||||
|
if (readBuffer.getBuffer().length == 0) {
|
||||||
|
//EOF
|
||||||
|
readBuffer.setStatus(ReadBuffer.STATUS.SUCCESS);
|
||||||
|
} else {
|
||||||
|
this.readAheadExecutorService.execute(
|
||||||
|
new AliyunOSSFileReaderTask(key, store, readBuffer));
|
||||||
|
}
|
||||||
|
readBufferQueue.add(readBuffer);
|
||||||
|
if (isRandomIO) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ReadBuffer readBuffer = readBufferQueue.poll();
|
||||||
|
readBuffer.lock();
|
||||||
|
try {
|
||||||
|
readBuffer.await(ReadBuffer.STATUS.INIT);
|
||||||
|
if (readBuffer.getStatus() == ReadBuffer.STATUS.ERROR) {
|
||||||
|
this.buffer = null;
|
||||||
|
} else {
|
||||||
|
this.buffer = readBuffer.getBuffer();
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.warn("interrupted when wait a read buffer");
|
||||||
|
} finally {
|
||||||
|
readBuffer.unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.buffer == null) {
|
||||||
throw new IOException("Null IO stream");
|
throw new IOException("Null IO stream");
|
||||||
}
|
}
|
||||||
position = pos;
|
position = pos;
|
||||||
|
@ -105,18 +187,10 @@ public class AliyunOSSInputStream extends FSInputStream {
|
||||||
reopen(position);
|
reopen(position);
|
||||||
}
|
}
|
||||||
|
|
||||||
int tries = MAX_RETRIES;
|
|
||||||
boolean retry;
|
|
||||||
int byteRead = -1;
|
int byteRead = -1;
|
||||||
do {
|
if (partRemaining != 0) {
|
||||||
retry = false;
|
byteRead = this.buffer[this.buffer.length - (int)partRemaining] & 0xFF;
|
||||||
try {
|
|
||||||
byteRead = wrappedStream.read();
|
|
||||||
} catch (Exception e) {
|
|
||||||
handleReadException(e, --tries);
|
|
||||||
retry = true;
|
|
||||||
}
|
}
|
||||||
} while (retry);
|
|
||||||
if (byteRead >= 0) {
|
if (byteRead >= 0) {
|
||||||
position++;
|
position++;
|
||||||
partRemaining--;
|
partRemaining--;
|
||||||
|
@ -161,21 +235,18 @@ public class AliyunOSSInputStream extends FSInputStream {
|
||||||
reopen(position);
|
reopen(position);
|
||||||
}
|
}
|
||||||
|
|
||||||
int tries = MAX_RETRIES;
|
int bytes = 0;
|
||||||
boolean retry;
|
for (int i = this.buffer.length - (int)partRemaining;
|
||||||
int bytes = -1;
|
i < this.buffer.length; i++) {
|
||||||
do {
|
buf[off + bytesRead] = this.buffer[i];
|
||||||
retry = false;
|
bytes++;
|
||||||
try {
|
bytesRead++;
|
||||||
bytes = wrappedStream.read(buf, off + bytesRead, len - bytesRead);
|
if (off + bytesRead >= len) {
|
||||||
} catch (Exception e) {
|
break;
|
||||||
handleReadException(e, --tries);
|
}
|
||||||
retry = true;
|
|
||||||
}
|
}
|
||||||
} while (retry);
|
|
||||||
|
|
||||||
if (bytes > 0) {
|
if (bytes > 0) {
|
||||||
bytesRead += bytes;
|
|
||||||
position += bytes;
|
position += bytes;
|
||||||
partRemaining -= bytes;
|
partRemaining -= bytes;
|
||||||
} else if (partRemaining != 0) {
|
} else if (partRemaining != 0) {
|
||||||
|
@ -202,9 +273,7 @@ public class AliyunOSSInputStream extends FSInputStream {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
closed = true;
|
closed = true;
|
||||||
if (wrappedStream != null) {
|
this.buffer = null;
|
||||||
wrappedStream.close();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -225,7 +294,6 @@ public class AliyunOSSInputStream extends FSInputStream {
|
||||||
return;
|
return;
|
||||||
} else if (pos > position && pos < position + partRemaining) {
|
} else if (pos > position && pos < position + partRemaining) {
|
||||||
long len = pos - position;
|
long len = pos - position;
|
||||||
AliyunOSSUtils.skipFully(wrappedStream, len);
|
|
||||||
position = pos;
|
position = pos;
|
||||||
partRemaining -= len;
|
partRemaining -= len;
|
||||||
} else {
|
} else {
|
||||||
|
@ -245,18 +313,7 @@ public class AliyunOSSInputStream extends FSInputStream {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleReadException(Exception e, int tries) throws IOException{
|
public long getExpectNextPos() {
|
||||||
if (tries == 0) {
|
return this.expectNextPos;
|
||||||
throw new IOException(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
LOG.warn("Some exceptions occurred in oss connection, try to reopen oss" +
|
|
||||||
" connection at position '" + position + "', " + e.getMessage());
|
|
||||||
try {
|
|
||||||
Thread.sleep(100);
|
|
||||||
} catch (InterruptedException e2) {
|
|
||||||
LOG.warn(e2.getMessage());
|
|
||||||
}
|
|
||||||
reopen(position);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,6 +40,18 @@ public final class AliyunOSSUtils {
|
||||||
private AliyunOSSUtils() {
|
private AliyunOSSUtils() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static int intPositiveOption(
|
||||||
|
Configuration conf, String key, int defVal) {
|
||||||
|
int v = conf.getInt(key, defVal);
|
||||||
|
if (v <= 0) {
|
||||||
|
LOG.warn(key + " is configured to " + v
|
||||||
|
+ ", will use default value: " + defVal);
|
||||||
|
v = defVal;
|
||||||
|
}
|
||||||
|
|
||||||
|
return v;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used to get password from configuration.
|
* Used to get password from configuration.
|
||||||
*
|
*
|
||||||
|
|
|
@ -97,7 +97,18 @@ public final class Constants {
|
||||||
public static final String MULTIPART_DOWNLOAD_SIZE_KEY =
|
public static final String MULTIPART_DOWNLOAD_SIZE_KEY =
|
||||||
"fs.oss.multipart.download.size";
|
"fs.oss.multipart.download.size";
|
||||||
|
|
||||||
public static final long MULTIPART_DOWNLOAD_SIZE_DEFAULT = 100 * 1024;
|
public static final long MULTIPART_DOWNLOAD_SIZE_DEFAULT = 512 * 1024;
|
||||||
|
|
||||||
|
public static final String MULTIPART_DOWNLOAD_THREAD_NUMBER_KEY =
|
||||||
|
"fs.oss.multipart.download.threads";
|
||||||
|
public static final int MULTIPART_DOWNLOAD_THREAD_NUMBER_DEFAULT = 10;
|
||||||
|
|
||||||
|
public static final String MAX_TOTAL_TASKS_KEY = "fs.oss.max.total.tasks";
|
||||||
|
public static final int MAX_TOTAL_TASKS_DEFAULT = 128;
|
||||||
|
|
||||||
|
public static final String MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_KEY =
|
||||||
|
"fs.oss.multipart.download.ahead.part.max.number";
|
||||||
|
public static final int MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_DEFAULT = 4;
|
||||||
|
|
||||||
// Comma separated list of directories
|
// Comma separated list of directories
|
||||||
public static final String BUFFER_DIR_KEY = "fs.oss.buffer.dir";
|
public static final String BUFFER_DIR_KEY = "fs.oss.buffer.dir";
|
||||||
|
|
|
@ -0,0 +1,86 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.aliyun.oss;
|
||||||
|
|
||||||
|
import java.util.concurrent.locks.Condition;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class is used by {@link AliyunOSSInputStream}
|
||||||
|
* and {@link AliyunOSSFileReaderTask} to buffer data that read from oss.
|
||||||
|
*/
|
||||||
|
public class ReadBuffer {
|
||||||
|
enum STATUS {
|
||||||
|
INIT, SUCCESS, ERROR
|
||||||
|
}
|
||||||
|
private final ReentrantLock lock = new ReentrantLock();
|
||||||
|
|
||||||
|
private Condition readyCondition = lock.newCondition();
|
||||||
|
|
||||||
|
private byte[] buffer;
|
||||||
|
private STATUS status;
|
||||||
|
private long byteStart;
|
||||||
|
private long byteEnd;
|
||||||
|
|
||||||
|
public ReadBuffer(long byteStart, long byteEnd) {
|
||||||
|
this.buffer = new byte[(int)(byteEnd - byteStart) + 1];
|
||||||
|
|
||||||
|
this.status = STATUS.INIT;
|
||||||
|
this.byteStart = byteStart;
|
||||||
|
this.byteEnd = byteEnd;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void lock() {
|
||||||
|
lock.lock();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void unlock() {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void await(STATUS waitStatus) throws InterruptedException {
|
||||||
|
while (this.status == waitStatus) {
|
||||||
|
readyCondition.await();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void signalAll() {
|
||||||
|
readyCondition.signalAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte[] getBuffer() {
|
||||||
|
return buffer;
|
||||||
|
}
|
||||||
|
|
||||||
|
public STATUS getStatus() {
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setStatus(STATUS status) {
|
||||||
|
this.status = status;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getByteStart() {
|
||||||
|
return byteStart;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getByteEnd() {
|
||||||
|
return byteEnd;
|
||||||
|
}
|
||||||
|
}
|
|
@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -107,6 +108,54 @@ public class TestAliyunOSSInputStream {
|
||||||
IOUtils.closeStream(instream);
|
IOUtils.closeStream(instream);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSequentialAndRandomRead() throws Exception {
|
||||||
|
Path smallSeekFile = setPath("/test/smallSeekFile.txt");
|
||||||
|
long size = 5 * 1024 * 1024;
|
||||||
|
|
||||||
|
ContractTestUtils.generateTestFile(this.fs, smallSeekFile, size, 256, 255);
|
||||||
|
LOG.info("5MB file created: smallSeekFile.txt");
|
||||||
|
|
||||||
|
FSDataInputStream fsDataInputStream = this.fs.open(smallSeekFile);
|
||||||
|
AliyunOSSInputStream in =
|
||||||
|
(AliyunOSSInputStream)fsDataInputStream.getWrappedStream();
|
||||||
|
assertTrue("expected position at:" + 0 + ", but got:"
|
||||||
|
+ fsDataInputStream.getPos(), fsDataInputStream.getPos() == 0);
|
||||||
|
|
||||||
|
assertTrue("expected position at:"
|
||||||
|
+ Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT + ", but got:"
|
||||||
|
+ in.getExpectNextPos(),
|
||||||
|
in.getExpectNextPos() == Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT);
|
||||||
|
fsDataInputStream.seek(4 * 1024 * 1024);
|
||||||
|
assertTrue("expected position at:" + 4 * 1024 * 1024
|
||||||
|
+ Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT + ", but got:"
|
||||||
|
+ in.getExpectNextPos(),
|
||||||
|
in.getExpectNextPos() == 4 * 1024 * 1024
|
||||||
|
+ Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT);
|
||||||
|
IOUtils.closeStream(fsDataInputStream);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOSSFileReaderTask() throws Exception {
|
||||||
|
Path smallSeekFile = setPath("/test/smallSeekFileOSSFileReader.txt");
|
||||||
|
long size = 5 * 1024 * 1024;
|
||||||
|
|
||||||
|
ContractTestUtils.generateTestFile(this.fs, smallSeekFile, size, 256, 255);
|
||||||
|
LOG.info("5MB file created: smallSeekFileOSSFileReader.txt");
|
||||||
|
ReadBuffer readBuffer = new ReadBuffer(12, 24);
|
||||||
|
AliyunOSSFileReaderTask task = new AliyunOSSFileReaderTask("1",
|
||||||
|
((AliyunOSSFileSystem)this.fs).getStore(), readBuffer);
|
||||||
|
//NullPointerException, fail
|
||||||
|
task.run();
|
||||||
|
assertEquals(readBuffer.getStatus(), ReadBuffer.STATUS.ERROR);
|
||||||
|
//OK
|
||||||
|
task = new AliyunOSSFileReaderTask(
|
||||||
|
"test/test/smallSeekFileOSSFileReader.txt",
|
||||||
|
((AliyunOSSFileSystem)this.fs).getStore(), readBuffer);
|
||||||
|
task.run();
|
||||||
|
assertEquals(readBuffer.getStatus(), ReadBuffer.STATUS.SUCCESS);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReadFile() throws Exception {
|
public void testReadFile() throws Exception {
|
||||||
final int bufLen = 256;
|
final int bufLen = 256;
|
||||||
|
|
Loading…
Reference in New Issue