diff --git a/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml
index 40d78d0cd6c..c55f8e3e616 100644
--- a/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml
+++ b/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml
@@ -15,4 +15,12 @@
limitations under the License.
-->
+
+
+
+
+
+
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileReaderTask.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileReaderTask.java
new file mode 100644
index 00000000000..e5bfc2c4d9e
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileReaderTask.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Used by {@link AliyunOSSInputStream} as an task that submitted
+ * to the thread pool.
+ * Each AliyunOSSFileReaderTask reads one part of the file so that
+ * we can accelerate the sequential read.
+ */
+public class AliyunOSSFileReaderTask implements Runnable {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(AliyunOSSFileReaderTask.class);
+
+ private String key;
+ private AliyunOSSFileSystemStore store;
+ private ReadBuffer readBuffer;
+ private static final int MAX_RETRIES = 3;
+ private RetryPolicy retryPolicy;
+
+ public AliyunOSSFileReaderTask(String key, AliyunOSSFileSystemStore store,
+ ReadBuffer readBuffer) {
+ this.key = key;
+ this.store = store;
+ this.readBuffer = readBuffer;
+ RetryPolicy defaultPolicy =
+ RetryPolicies.retryUpToMaximumCountWithFixedSleep(
+ MAX_RETRIES, 3, TimeUnit.SECONDS);
+ Map, RetryPolicy> policies = new HashMap<>();
+ policies.put(IOException.class, defaultPolicy);
+ policies.put(IndexOutOfBoundsException.class,
+ RetryPolicies.TRY_ONCE_THEN_FAIL);
+ policies.put(NullPointerException.class,
+ RetryPolicies.TRY_ONCE_THEN_FAIL);
+
+ this.retryPolicy = RetryPolicies.retryByException(defaultPolicy, policies);
+ }
+
+ @Override
+ public void run() {
+ int retries = 0;
+ readBuffer.lock();
+ try {
+ while (true) {
+ try (InputStream in = store.retrieve(
+ key, readBuffer.getByteStart(), readBuffer.getByteEnd())) {
+ IOUtils.readFully(in, readBuffer.getBuffer(),
+ 0, readBuffer.getBuffer().length);
+ readBuffer.setStatus(ReadBuffer.STATUS.SUCCESS);
+ break;
+ } catch (Exception e) {
+ LOG.warn("Exception thrown when retrieve key: "
+ + this.key + ", exception: " + e);
+ try {
+ RetryPolicy.RetryAction rc = retryPolicy.shouldRetry(
+ e, retries++, 0, true);
+ if (rc.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
+ Thread.sleep(rc.delayMillis);
+ } else {
+ //should not retry
+ break;
+ }
+ } catch (Exception ex) {
+ //FAIL
+ LOG.warn("Exception thrown when call shouldRetry, exception " + ex);
+ break;
+ }
+ }
+ }
+
+ if (readBuffer.getStatus() != ReadBuffer.STATUS.SUCCESS) {
+ readBuffer.setStatus(ReadBuffer.STATUS.ERROR);
+ }
+
+ //notify main thread which wait for this buffer
+ readBuffer.signalAll();
+ } finally {
+ readBuffer.unlock();
+ }
+ }
+}
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
index 21fdabfe894..06a13a8bfa4 100644
--- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
@@ -24,7 +24,9 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
+import java.util.concurrent.TimeUnit;
+import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
@@ -41,12 +43,14 @@ import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.Progressable;
import com.aliyun.oss.model.OSSObjectSummary;
import com.aliyun.oss.model.ObjectListing;
import com.aliyun.oss.model.ObjectMetadata;
+import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,6 +69,9 @@ public class AliyunOSSFileSystem extends FileSystem {
private Path workingDir;
private AliyunOSSFileSystemStore store;
private int maxKeys;
+ private int maxReadAheadPartNumber;
+ private ListeningExecutorService boundedThreadPool;
+
private static final PathFilter DEFAULT_FILTER = new PathFilter() {
@Override
public boolean accept(Path file) {
@@ -82,6 +89,7 @@ public class AliyunOSSFileSystem extends FileSystem {
public void close() throws IOException {
try {
store.close();
+ boundedThreadPool.shutdown();
} finally {
super.close();
}
@@ -309,10 +317,24 @@ public class AliyunOSSFileSystem extends FileSystem {
store = new AliyunOSSFileSystemStore();
store.initialize(name, conf, statistics);
maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
+
+ int threadNum = AliyunOSSUtils.intPositiveOption(conf,
+ Constants.MULTIPART_DOWNLOAD_THREAD_NUMBER_KEY,
+ Constants.MULTIPART_DOWNLOAD_THREAD_NUMBER_DEFAULT);
+
+ int totalTasks = AliyunOSSUtils.intPositiveOption(conf,
+ Constants.MAX_TOTAL_TASKS_KEY, Constants.MAX_TOTAL_TASKS_DEFAULT);
+
+ maxReadAheadPartNumber = AliyunOSSUtils.intPositiveOption(conf,
+ Constants.MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_KEY,
+ Constants.MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_DEFAULT);
+
+ this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
+ threadNum, totalTasks, 60L, TimeUnit.SECONDS, "oss-read-shared");
setConf(conf);
}
- /**
+/**
* Turn a path (relative or otherwise) into an OSS key.
*
* @param path the path of the file.
@@ -523,8 +545,11 @@ public class AliyunOSSFileSystem extends FileSystem {
" because it is a directory");
}
- return new FSDataInputStream(new AliyunOSSInputStream(getConf(), store,
- pathToKey(path), fileStatus.getLen(), statistics));
+ return new FSDataInputStream(new AliyunOSSInputStream(getConf(),
+ new SemaphoredDelegatingExecutor(
+ boundedThreadPool, maxReadAheadPartNumber, true),
+ maxReadAheadPartNumber, store, pathToKey(path), fileStatus.getLen(),
+ statistics));
}
@Override
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java
index 3b2bc022e22..68f11c59d24 100644
--- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java
@@ -22,8 +22,11 @@ import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
import java.io.EOFException;
import java.io.IOException;
-import java.io.InputStream;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -43,20 +46,33 @@ public class AliyunOSSInputStream extends FSInputStream {
private final String key;
private Statistics statistics;
private boolean closed;
- private InputStream wrappedStream = null;
private long contentLength;
private long position;
private long partRemaining;
+ private byte[] buffer;
+ private int maxReadAheadPartNumber;
+ private long expectNextPos;
+ private long lastByteStart;
+
+ private ExecutorService readAheadExecutorService;
+ private Queue readBufferQueue = new ArrayDeque<>();
public AliyunOSSInputStream(Configuration conf,
+ ExecutorService readAheadExecutorService, int maxReadAheadPartNumber,
AliyunOSSFileSystemStore store, String key, Long contentLength,
Statistics statistics) throws IOException {
+ this.readAheadExecutorService =
+ MoreExecutors.listeningDecorator(readAheadExecutorService);
this.store = store;
this.key = key;
this.statistics = statistics;
this.contentLength = contentLength;
downloadPartSize = conf.getLong(MULTIPART_DOWNLOAD_SIZE_KEY,
MULTIPART_DOWNLOAD_SIZE_DEFAULT);
+ this.maxReadAheadPartNumber = maxReadAheadPartNumber;
+
+ this.expectNextPos = 0;
+ this.lastByteStart = -1;
reopen(0);
closed = false;
}
@@ -82,15 +98,81 @@ public class AliyunOSSInputStream extends FSInputStream {
partSize = downloadPartSize;
}
- if (wrappedStream != null) {
+ if (this.buffer != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Aborting old stream to open at pos " + pos);
}
- wrappedStream.close();
+ this.buffer = null;
}
- wrappedStream = store.retrieve(key, pos, pos + partSize -1);
- if (wrappedStream == null) {
+ boolean isRandomIO = true;
+ if (pos == this.expectNextPos) {
+ isRandomIO = false;
+ } else {
+ //new seek, remove cache buffers if its byteStart is not equal to pos
+ while (readBufferQueue.size() != 0) {
+ if (readBufferQueue.element().getByteStart() != pos) {
+ readBufferQueue.poll();
+ } else {
+ break;
+ }
+ }
+ }
+
+ this.expectNextPos = pos + partSize;
+
+ int currentSize = readBufferQueue.size();
+ if (currentSize == 0) {
+ //init lastByteStart to pos - partSize, used by for loop below
+ lastByteStart = pos - partSize;
+ } else {
+ ReadBuffer[] readBuffers = readBufferQueue.toArray(
+ new ReadBuffer[currentSize]);
+ lastByteStart = readBuffers[currentSize - 1].getByteStart();
+ }
+
+ int maxLen = this.maxReadAheadPartNumber - currentSize;
+ for (int i = 0; i < maxLen && i < (currentSize + 1) * 2; i++) {
+ if (lastByteStart + partSize * (i + 1) > contentLength) {
+ break;
+ }
+
+ long byteStart = lastByteStart + partSize * (i + 1);
+ long byteEnd = byteStart + partSize -1;
+ if (byteEnd >= contentLength) {
+ byteEnd = contentLength - 1;
+ }
+
+ ReadBuffer readBuffer = new ReadBuffer(byteStart, byteEnd);
+ if (readBuffer.getBuffer().length == 0) {
+ //EOF
+ readBuffer.setStatus(ReadBuffer.STATUS.SUCCESS);
+ } else {
+ this.readAheadExecutorService.execute(
+ new AliyunOSSFileReaderTask(key, store, readBuffer));
+ }
+ readBufferQueue.add(readBuffer);
+ if (isRandomIO) {
+ break;
+ }
+ }
+
+ ReadBuffer readBuffer = readBufferQueue.poll();
+ readBuffer.lock();
+ try {
+ readBuffer.await(ReadBuffer.STATUS.INIT);
+ if (readBuffer.getStatus() == ReadBuffer.STATUS.ERROR) {
+ this.buffer = null;
+ } else {
+ this.buffer = readBuffer.getBuffer();
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("interrupted when wait a read buffer");
+ } finally {
+ readBuffer.unlock();
+ }
+
+ if (this.buffer == null) {
throw new IOException("Null IO stream");
}
position = pos;
@@ -105,18 +187,10 @@ public class AliyunOSSInputStream extends FSInputStream {
reopen(position);
}
- int tries = MAX_RETRIES;
- boolean retry;
int byteRead = -1;
- do {
- retry = false;
- try {
- byteRead = wrappedStream.read();
- } catch (Exception e) {
- handleReadException(e, --tries);
- retry = true;
- }
- } while (retry);
+ if (partRemaining != 0) {
+ byteRead = this.buffer[this.buffer.length - (int)partRemaining] & 0xFF;
+ }
if (byteRead >= 0) {
position++;
partRemaining--;
@@ -161,21 +235,18 @@ public class AliyunOSSInputStream extends FSInputStream {
reopen(position);
}
- int tries = MAX_RETRIES;
- boolean retry;
- int bytes = -1;
- do {
- retry = false;
- try {
- bytes = wrappedStream.read(buf, off + bytesRead, len - bytesRead);
- } catch (Exception e) {
- handleReadException(e, --tries);
- retry = true;
+ int bytes = 0;
+ for (int i = this.buffer.length - (int)partRemaining;
+ i < this.buffer.length; i++) {
+ buf[off + bytesRead] = this.buffer[i];
+ bytes++;
+ bytesRead++;
+ if (off + bytesRead >= len) {
+ break;
}
- } while (retry);
+ }
if (bytes > 0) {
- bytesRead += bytes;
position += bytes;
partRemaining -= bytes;
} else if (partRemaining != 0) {
@@ -202,9 +273,7 @@ public class AliyunOSSInputStream extends FSInputStream {
return;
}
closed = true;
- if (wrappedStream != null) {
- wrappedStream.close();
- }
+ this.buffer = null;
}
@Override
@@ -225,7 +294,6 @@ public class AliyunOSSInputStream extends FSInputStream {
return;
} else if (pos > position && pos < position + partRemaining) {
long len = pos - position;
- AliyunOSSUtils.skipFully(wrappedStream, len);
position = pos;
partRemaining -= len;
} else {
@@ -245,18 +313,7 @@ public class AliyunOSSInputStream extends FSInputStream {
return false;
}
- private void handleReadException(Exception e, int tries) throws IOException{
- if (tries == 0) {
- throw new IOException(e);
- }
-
- LOG.warn("Some exceptions occurred in oss connection, try to reopen oss" +
- " connection at position '" + position + "', " + e.getMessage());
- try {
- Thread.sleep(100);
- } catch (InterruptedException e2) {
- LOG.warn(e2.getMessage());
- }
- reopen(position);
+ public long getExpectNextPos() {
+ return this.expectNextPos;
}
}
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
index fdf72e48c09..1a2160889a1 100644
--- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
@@ -40,6 +40,18 @@ final public class AliyunOSSUtils {
private AliyunOSSUtils() {
}
+ public static int intPositiveOption(
+ Configuration conf, String key, int defVal) {
+ int v = conf.getInt(key, defVal);
+ if (v <= 0) {
+ LOG.warn(key + " is configured to " + v
+ + ", will use default value: " + defVal);
+ v = defVal;
+ }
+
+ return v;
+ }
+
/**
* Used to get password from configuration.
*
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
index dd71842fb87..410adc90373 100644
--- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
@@ -97,7 +97,18 @@ public final class Constants {
public static final String MULTIPART_DOWNLOAD_SIZE_KEY =
"fs.oss.multipart.download.size";
- public static final long MULTIPART_DOWNLOAD_SIZE_DEFAULT = 100 * 1024;
+ public static final long MULTIPART_DOWNLOAD_SIZE_DEFAULT = 512 * 1024;
+
+ public static final String MULTIPART_DOWNLOAD_THREAD_NUMBER_KEY =
+ "fs.oss.multipart.download.threads";
+ public static final int MULTIPART_DOWNLOAD_THREAD_NUMBER_DEFAULT = 10;
+
+ public static final String MAX_TOTAL_TASKS_KEY = "fs.oss.max.total.tasks";
+ public static final int MAX_TOTAL_TASKS_DEFAULT = 128;
+
+ public static final String MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_KEY =
+ "fs.oss.multipart.download.ahead.part.max.number";
+ public static final int MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_DEFAULT = 4;
// Comma separated list of directories
public static final String BUFFER_DIR_KEY = "fs.oss.buffer.dir";
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/ReadBuffer.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/ReadBuffer.java
new file mode 100644
index 00000000000..46bb5bf0795
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/ReadBuffer.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This class is used by {@link AliyunOSSInputStream}
+ * and {@link AliyunOSSFileReaderTask} to buffer data that read from oss.
+ */
+public class ReadBuffer {
+ enum STATUS {
+ INIT, SUCCESS, ERROR
+ }
+ private final ReentrantLock lock = new ReentrantLock();
+
+ private Condition readyCondition = lock.newCondition();
+
+ private byte[] buffer;
+ private STATUS status;
+ private long byteStart;
+ private long byteEnd;
+
+ public ReadBuffer(long byteStart, long byteEnd) {
+ this.buffer = new byte[(int)(byteEnd - byteStart) + 1];
+
+ this.status = STATUS.INIT;
+ this.byteStart = byteStart;
+ this.byteEnd = byteEnd;
+ }
+
+ public void lock() {
+ lock.lock();
+ }
+
+ public void unlock() {
+ lock.unlock();
+ }
+
+ public void await(STATUS waitStatus) throws InterruptedException {
+ while (this.status == waitStatus) {
+ readyCondition.await();
+ }
+ }
+
+ public void signalAll() {
+ readyCondition.signalAll();
+ }
+
+ public byte[] getBuffer() {
+ return buffer;
+ }
+
+ public STATUS getStatus() {
+ return status;
+ }
+
+ public void setStatus(STATUS status) {
+ this.status = status;
+ }
+
+ public long getByteStart() {
+ return byteStart;
+ }
+
+ public long getByteEnd() {
+ return byteEnd;
+ }
+}
diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java
index d798cafb206..f0bb26d8e7d 100644
--- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java
+++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
import java.util.Random;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
@@ -107,6 +108,54 @@ public class TestAliyunOSSInputStream {
IOUtils.closeStream(instream);
}
+ @Test
+ public void testSequentialAndRandomRead() throws Exception {
+ Path smallSeekFile = setPath("/test/smallSeekFile.txt");
+ long size = 5 * 1024 * 1024;
+
+ ContractTestUtils.generateTestFile(this.fs, smallSeekFile, size, 256, 255);
+ LOG.info("5MB file created: smallSeekFile.txt");
+
+ FSDataInputStream fsDataInputStream = this.fs.open(smallSeekFile);
+ AliyunOSSInputStream in =
+ (AliyunOSSInputStream)fsDataInputStream.getWrappedStream();
+ assertTrue("expected position at:" + 0 + ", but got:"
+ + fsDataInputStream.getPos(), fsDataInputStream.getPos() == 0);
+
+ assertTrue("expected position at:"
+ + Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT + ", but got:"
+ + in.getExpectNextPos(),
+ in.getExpectNextPos() == Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT);
+ fsDataInputStream.seek(4 * 1024 * 1024);
+ assertTrue("expected position at:" + 4 * 1024 * 1024
+ + Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT + ", but got:"
+ + in.getExpectNextPos(),
+ in.getExpectNextPos() == 4 * 1024 * 1024
+ + Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT);
+ IOUtils.closeStream(fsDataInputStream);
+ }
+
+ @Test
+ public void testOSSFileReaderTask() throws Exception {
+ Path smallSeekFile = setPath("/test/smallSeekFileOSSFileReader.txt");
+ long size = 5 * 1024 * 1024;
+
+ ContractTestUtils.generateTestFile(this.fs, smallSeekFile, size, 256, 255);
+ LOG.info("5MB file created: smallSeekFileOSSFileReader.txt");
+ ReadBuffer readBuffer = new ReadBuffer(12, 24);
+ AliyunOSSFileReaderTask task = new AliyunOSSFileReaderTask("1",
+ ((AliyunOSSFileSystem)this.fs).getStore(), readBuffer);
+ //NullPointerException, fail
+ task.run();
+ assertEquals(readBuffer.getStatus(), ReadBuffer.STATUS.ERROR);
+ //OK
+ task = new AliyunOSSFileReaderTask(
+ "test/test/smallSeekFileOSSFileReader.txt",
+ ((AliyunOSSFileSystem)this.fs).getStore(), readBuffer);
+ task.run();
+ assertEquals(readBuffer.getStatus(), ReadBuffer.STATUS.SUCCESS);
+ }
+
@Test
public void testReadFile() throws Exception {
final int bufLen = 256;