diff --git a/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml
index c55f8e3e616..40d78d0cd6c 100644
--- a/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml
+++ b/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml
@@ -15,12 +15,4 @@
limitations under the License.
-->
-
-
-
-
-
-
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileReaderTask.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileReaderTask.java
deleted file mode 100644
index e5bfc2c4d9e..00000000000
--- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileReaderTask.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.aliyun.oss;
-
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Used by {@link AliyunOSSInputStream} as an task that submitted
- * to the thread pool.
- * Each AliyunOSSFileReaderTask reads one part of the file so that
- * we can accelerate the sequential read.
- */
-public class AliyunOSSFileReaderTask implements Runnable {
- public static final Logger LOG =
- LoggerFactory.getLogger(AliyunOSSFileReaderTask.class);
-
- private String key;
- private AliyunOSSFileSystemStore store;
- private ReadBuffer readBuffer;
- private static final int MAX_RETRIES = 3;
- private RetryPolicy retryPolicy;
-
- public AliyunOSSFileReaderTask(String key, AliyunOSSFileSystemStore store,
- ReadBuffer readBuffer) {
- this.key = key;
- this.store = store;
- this.readBuffer = readBuffer;
- RetryPolicy defaultPolicy =
- RetryPolicies.retryUpToMaximumCountWithFixedSleep(
- MAX_RETRIES, 3, TimeUnit.SECONDS);
- Map, RetryPolicy> policies = new HashMap<>();
- policies.put(IOException.class, defaultPolicy);
- policies.put(IndexOutOfBoundsException.class,
- RetryPolicies.TRY_ONCE_THEN_FAIL);
- policies.put(NullPointerException.class,
- RetryPolicies.TRY_ONCE_THEN_FAIL);
-
- this.retryPolicy = RetryPolicies.retryByException(defaultPolicy, policies);
- }
-
- @Override
- public void run() {
- int retries = 0;
- readBuffer.lock();
- try {
- while (true) {
- try (InputStream in = store.retrieve(
- key, readBuffer.getByteStart(), readBuffer.getByteEnd())) {
- IOUtils.readFully(in, readBuffer.getBuffer(),
- 0, readBuffer.getBuffer().length);
- readBuffer.setStatus(ReadBuffer.STATUS.SUCCESS);
- break;
- } catch (Exception e) {
- LOG.warn("Exception thrown when retrieve key: "
- + this.key + ", exception: " + e);
- try {
- RetryPolicy.RetryAction rc = retryPolicy.shouldRetry(
- e, retries++, 0, true);
- if (rc.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
- Thread.sleep(rc.delayMillis);
- } else {
- //should not retry
- break;
- }
- } catch (Exception ex) {
- //FAIL
- LOG.warn("Exception thrown when call shouldRetry, exception " + ex);
- break;
- }
- }
- }
-
- if (readBuffer.getStatus() != ReadBuffer.STATUS.SUCCESS) {
- readBuffer.setStatus(ReadBuffer.STATUS.ERROR);
- }
-
- //notify main thread which wait for this buffer
- readBuffer.signalAll();
- } finally {
- readBuffer.unlock();
- }
- }
-}
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
index 06a13a8bfa4..21fdabfe894 100644
--- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
@@ -24,9 +24,7 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
-import java.util.concurrent.TimeUnit;
-import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
@@ -43,14 +41,12 @@ import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.Progressable;
import com.aliyun.oss.model.OSSObjectSummary;
import com.aliyun.oss.model.ObjectListing;
import com.aliyun.oss.model.ObjectMetadata;
-import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,9 +65,6 @@ public class AliyunOSSFileSystem extends FileSystem {
private Path workingDir;
private AliyunOSSFileSystemStore store;
private int maxKeys;
- private int maxReadAheadPartNumber;
- private ListeningExecutorService boundedThreadPool;
-
private static final PathFilter DEFAULT_FILTER = new PathFilter() {
@Override
public boolean accept(Path file) {
@@ -89,7 +82,6 @@ public class AliyunOSSFileSystem extends FileSystem {
public void close() throws IOException {
try {
store.close();
- boundedThreadPool.shutdown();
} finally {
super.close();
}
@@ -317,24 +309,10 @@ public class AliyunOSSFileSystem extends FileSystem {
store = new AliyunOSSFileSystemStore();
store.initialize(name, conf, statistics);
maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
-
- int threadNum = AliyunOSSUtils.intPositiveOption(conf,
- Constants.MULTIPART_DOWNLOAD_THREAD_NUMBER_KEY,
- Constants.MULTIPART_DOWNLOAD_THREAD_NUMBER_DEFAULT);
-
- int totalTasks = AliyunOSSUtils.intPositiveOption(conf,
- Constants.MAX_TOTAL_TASKS_KEY, Constants.MAX_TOTAL_TASKS_DEFAULT);
-
- maxReadAheadPartNumber = AliyunOSSUtils.intPositiveOption(conf,
- Constants.MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_KEY,
- Constants.MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_DEFAULT);
-
- this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
- threadNum, totalTasks, 60L, TimeUnit.SECONDS, "oss-read-shared");
setConf(conf);
}
-/**
+ /**
* Turn a path (relative or otherwise) into an OSS key.
*
* @param path the path of the file.
@@ -545,11 +523,8 @@ public class AliyunOSSFileSystem extends FileSystem {
" because it is a directory");
}
- return new FSDataInputStream(new AliyunOSSInputStream(getConf(),
- new SemaphoredDelegatingExecutor(
- boundedThreadPool, maxReadAheadPartNumber, true),
- maxReadAheadPartNumber, store, pathToKey(path), fileStatus.getLen(),
- statistics));
+ return new FSDataInputStream(new AliyunOSSInputStream(getConf(), store,
+ pathToKey(path), fileStatus.getLen(), statistics));
}
@Override
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java
index 68f11c59d24..3b2bc022e22 100644
--- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java
@@ -22,11 +22,8 @@ import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
import java.io.EOFException;
import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Queue;
-import java.util.concurrent.ExecutorService;
+import java.io.InputStream;
-import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -46,33 +43,20 @@ public class AliyunOSSInputStream extends FSInputStream {
private final String key;
private Statistics statistics;
private boolean closed;
+ private InputStream wrappedStream = null;
private long contentLength;
private long position;
private long partRemaining;
- private byte[] buffer;
- private int maxReadAheadPartNumber;
- private long expectNextPos;
- private long lastByteStart;
-
- private ExecutorService readAheadExecutorService;
- private Queue readBufferQueue = new ArrayDeque<>();
public AliyunOSSInputStream(Configuration conf,
- ExecutorService readAheadExecutorService, int maxReadAheadPartNumber,
AliyunOSSFileSystemStore store, String key, Long contentLength,
Statistics statistics) throws IOException {
- this.readAheadExecutorService =
- MoreExecutors.listeningDecorator(readAheadExecutorService);
this.store = store;
this.key = key;
this.statistics = statistics;
this.contentLength = contentLength;
downloadPartSize = conf.getLong(MULTIPART_DOWNLOAD_SIZE_KEY,
MULTIPART_DOWNLOAD_SIZE_DEFAULT);
- this.maxReadAheadPartNumber = maxReadAheadPartNumber;
-
- this.expectNextPos = 0;
- this.lastByteStart = -1;
reopen(0);
closed = false;
}
@@ -98,81 +82,15 @@ public class AliyunOSSInputStream extends FSInputStream {
partSize = downloadPartSize;
}
- if (this.buffer != null) {
+ if (wrappedStream != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Aborting old stream to open at pos " + pos);
}
- this.buffer = null;
+ wrappedStream.close();
}
- boolean isRandomIO = true;
- if (pos == this.expectNextPos) {
- isRandomIO = false;
- } else {
- //new seek, remove cache buffers if its byteStart is not equal to pos
- while (readBufferQueue.size() != 0) {
- if (readBufferQueue.element().getByteStart() != pos) {
- readBufferQueue.poll();
- } else {
- break;
- }
- }
- }
-
- this.expectNextPos = pos + partSize;
-
- int currentSize = readBufferQueue.size();
- if (currentSize == 0) {
- //init lastByteStart to pos - partSize, used by for loop below
- lastByteStart = pos - partSize;
- } else {
- ReadBuffer[] readBuffers = readBufferQueue.toArray(
- new ReadBuffer[currentSize]);
- lastByteStart = readBuffers[currentSize - 1].getByteStart();
- }
-
- int maxLen = this.maxReadAheadPartNumber - currentSize;
- for (int i = 0; i < maxLen && i < (currentSize + 1) * 2; i++) {
- if (lastByteStart + partSize * (i + 1) > contentLength) {
- break;
- }
-
- long byteStart = lastByteStart + partSize * (i + 1);
- long byteEnd = byteStart + partSize -1;
- if (byteEnd >= contentLength) {
- byteEnd = contentLength - 1;
- }
-
- ReadBuffer readBuffer = new ReadBuffer(byteStart, byteEnd);
- if (readBuffer.getBuffer().length == 0) {
- //EOF
- readBuffer.setStatus(ReadBuffer.STATUS.SUCCESS);
- } else {
- this.readAheadExecutorService.execute(
- new AliyunOSSFileReaderTask(key, store, readBuffer));
- }
- readBufferQueue.add(readBuffer);
- if (isRandomIO) {
- break;
- }
- }
-
- ReadBuffer readBuffer = readBufferQueue.poll();
- readBuffer.lock();
- try {
- readBuffer.await(ReadBuffer.STATUS.INIT);
- if (readBuffer.getStatus() == ReadBuffer.STATUS.ERROR) {
- this.buffer = null;
- } else {
- this.buffer = readBuffer.getBuffer();
- }
- } catch (InterruptedException e) {
- LOG.warn("interrupted when wait a read buffer");
- } finally {
- readBuffer.unlock();
- }
-
- if (this.buffer == null) {
+ wrappedStream = store.retrieve(key, pos, pos + partSize -1);
+ if (wrappedStream == null) {
throw new IOException("Null IO stream");
}
position = pos;
@@ -187,10 +105,18 @@ public class AliyunOSSInputStream extends FSInputStream {
reopen(position);
}
+ int tries = MAX_RETRIES;
+ boolean retry;
int byteRead = -1;
- if (partRemaining != 0) {
- byteRead = this.buffer[this.buffer.length - (int)partRemaining] & 0xFF;
- }
+ do {
+ retry = false;
+ try {
+ byteRead = wrappedStream.read();
+ } catch (Exception e) {
+ handleReadException(e, --tries);
+ retry = true;
+ }
+ } while (retry);
if (byteRead >= 0) {
position++;
partRemaining--;
@@ -235,18 +161,21 @@ public class AliyunOSSInputStream extends FSInputStream {
reopen(position);
}
- int bytes = 0;
- for (int i = this.buffer.length - (int)partRemaining;
- i < this.buffer.length; i++) {
- buf[off + bytesRead] = this.buffer[i];
- bytes++;
- bytesRead++;
- if (off + bytesRead >= len) {
- break;
+ int tries = MAX_RETRIES;
+ boolean retry;
+ int bytes = -1;
+ do {
+ retry = false;
+ try {
+ bytes = wrappedStream.read(buf, off + bytesRead, len - bytesRead);
+ } catch (Exception e) {
+ handleReadException(e, --tries);
+ retry = true;
}
- }
+ } while (retry);
if (bytes > 0) {
+ bytesRead += bytes;
position += bytes;
partRemaining -= bytes;
} else if (partRemaining != 0) {
@@ -273,7 +202,9 @@ public class AliyunOSSInputStream extends FSInputStream {
return;
}
closed = true;
- this.buffer = null;
+ if (wrappedStream != null) {
+ wrappedStream.close();
+ }
}
@Override
@@ -294,6 +225,7 @@ public class AliyunOSSInputStream extends FSInputStream {
return;
} else if (pos > position && pos < position + partRemaining) {
long len = pos - position;
+ AliyunOSSUtils.skipFully(wrappedStream, len);
position = pos;
partRemaining -= len;
} else {
@@ -313,7 +245,18 @@ public class AliyunOSSInputStream extends FSInputStream {
return false;
}
- public long getExpectNextPos() {
- return this.expectNextPos;
+ private void handleReadException(Exception e, int tries) throws IOException{
+ if (tries == 0) {
+ throw new IOException(e);
+ }
+
+ LOG.warn("Some exceptions occurred in oss connection, try to reopen oss" +
+ " connection at position '" + position + "', " + e.getMessage());
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e2) {
+ LOG.warn(e2.getMessage());
+ }
+ reopen(position);
}
}
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
index 10dc30a218f..e891037b3bb 100644
--- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
@@ -40,18 +40,6 @@ public final class AliyunOSSUtils {
private AliyunOSSUtils() {
}
- public static int intPositiveOption(
- Configuration conf, String key, int defVal) {
- int v = conf.getInt(key, defVal);
- if (v <= 0) {
- LOG.warn(key + " is configured to " + v
- + ", will use default value: " + defVal);
- v = defVal;
- }
-
- return v;
- }
-
/**
* Used to get password from configuration.
*
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
index 410adc90373..dd71842fb87 100644
--- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
@@ -97,18 +97,7 @@ public final class Constants {
public static final String MULTIPART_DOWNLOAD_SIZE_KEY =
"fs.oss.multipart.download.size";
- public static final long MULTIPART_DOWNLOAD_SIZE_DEFAULT = 512 * 1024;
-
- public static final String MULTIPART_DOWNLOAD_THREAD_NUMBER_KEY =
- "fs.oss.multipart.download.threads";
- public static final int MULTIPART_DOWNLOAD_THREAD_NUMBER_DEFAULT = 10;
-
- public static final String MAX_TOTAL_TASKS_KEY = "fs.oss.max.total.tasks";
- public static final int MAX_TOTAL_TASKS_DEFAULT = 128;
-
- public static final String MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_KEY =
- "fs.oss.multipart.download.ahead.part.max.number";
- public static final int MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_DEFAULT = 4;
+ public static final long MULTIPART_DOWNLOAD_SIZE_DEFAULT = 100 * 1024;
// Comma separated list of directories
public static final String BUFFER_DIR_KEY = "fs.oss.buffer.dir";
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/ReadBuffer.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/ReadBuffer.java
deleted file mode 100644
index 46bb5bf0795..00000000000
--- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/ReadBuffer.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.aliyun.oss;
-
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * This class is used by {@link AliyunOSSInputStream}
- * and {@link AliyunOSSFileReaderTask} to buffer data that read from oss.
- */
-public class ReadBuffer {
- enum STATUS {
- INIT, SUCCESS, ERROR
- }
- private final ReentrantLock lock = new ReentrantLock();
-
- private Condition readyCondition = lock.newCondition();
-
- private byte[] buffer;
- private STATUS status;
- private long byteStart;
- private long byteEnd;
-
- public ReadBuffer(long byteStart, long byteEnd) {
- this.buffer = new byte[(int)(byteEnd - byteStart) + 1];
-
- this.status = STATUS.INIT;
- this.byteStart = byteStart;
- this.byteEnd = byteEnd;
- }
-
- public void lock() {
- lock.lock();
- }
-
- public void unlock() {
- lock.unlock();
- }
-
- public void await(STATUS waitStatus) throws InterruptedException {
- while (this.status == waitStatus) {
- readyCondition.await();
- }
- }
-
- public void signalAll() {
- readyCondition.signalAll();
- }
-
- public byte[] getBuffer() {
- return buffer;
- }
-
- public STATUS getStatus() {
- return status;
- }
-
- public void setStatus(STATUS status) {
- this.status = status;
- }
-
- public long getByteStart() {
- return byteStart;
- }
-
- public long getByteEnd() {
- return byteEnd;
- }
-}
diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java
index f0bb26d8e7d..d798cafb206 100644
--- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java
+++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java
@@ -35,7 +35,6 @@ import org.slf4j.LoggerFactory;
import java.util.Random;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
@@ -108,54 +107,6 @@ public class TestAliyunOSSInputStream {
IOUtils.closeStream(instream);
}
- @Test
- public void testSequentialAndRandomRead() throws Exception {
- Path smallSeekFile = setPath("/test/smallSeekFile.txt");
- long size = 5 * 1024 * 1024;
-
- ContractTestUtils.generateTestFile(this.fs, smallSeekFile, size, 256, 255);
- LOG.info("5MB file created: smallSeekFile.txt");
-
- FSDataInputStream fsDataInputStream = this.fs.open(smallSeekFile);
- AliyunOSSInputStream in =
- (AliyunOSSInputStream)fsDataInputStream.getWrappedStream();
- assertTrue("expected position at:" + 0 + ", but got:"
- + fsDataInputStream.getPos(), fsDataInputStream.getPos() == 0);
-
- assertTrue("expected position at:"
- + Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT + ", but got:"
- + in.getExpectNextPos(),
- in.getExpectNextPos() == Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT);
- fsDataInputStream.seek(4 * 1024 * 1024);
- assertTrue("expected position at:" + 4 * 1024 * 1024
- + Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT + ", but got:"
- + in.getExpectNextPos(),
- in.getExpectNextPos() == 4 * 1024 * 1024
- + Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT);
- IOUtils.closeStream(fsDataInputStream);
- }
-
- @Test
- public void testOSSFileReaderTask() throws Exception {
- Path smallSeekFile = setPath("/test/smallSeekFileOSSFileReader.txt");
- long size = 5 * 1024 * 1024;
-
- ContractTestUtils.generateTestFile(this.fs, smallSeekFile, size, 256, 255);
- LOG.info("5MB file created: smallSeekFileOSSFileReader.txt");
- ReadBuffer readBuffer = new ReadBuffer(12, 24);
- AliyunOSSFileReaderTask task = new AliyunOSSFileReaderTask("1",
- ((AliyunOSSFileSystem)this.fs).getStore(), readBuffer);
- //NullPointerException, fail
- task.run();
- assertEquals(readBuffer.getStatus(), ReadBuffer.STATUS.ERROR);
- //OK
- task = new AliyunOSSFileReaderTask(
- "test/test/smallSeekFileOSSFileReader.txt",
- ((AliyunOSSFileSystem)this.fs).getStore(), readBuffer);
- task.run();
- assertEquals(readBuffer.getStatus(), ReadBuffer.STATUS.SUCCESS);
- }
-
@Test
public void testReadFile() throws Exception {
final int bufLen = 256;