HADOOP-17374. support listObjectV2 (#3587)

(cherry picked from commit a9c51ea57d)
This commit is contained in:
Jinhu Wu 2021-11-04 12:47:41 +08:00 committed by Weiwei yang
parent 0379aebafe
commit 0557da6820
13 changed files with 425 additions and 67 deletions

View File

@ -107,6 +107,12 @@
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId> <artifactId>hadoop-common</artifactId>

View File

@ -49,7 +49,6 @@ import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import com.aliyun.oss.model.OSSObjectSummary; import com.aliyun.oss.model.OSSObjectSummary;
import com.aliyun.oss.model.ObjectListing;
import com.aliyun.oss.model.ObjectMetadata; import com.aliyun.oss.model.ObjectMetadata;
import org.apache.hadoop.util.SemaphoredDelegatingExecutor; import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
@ -271,14 +270,15 @@ public class AliyunOSSFileSystem extends FileSystem {
meta = store.getObjectMetadata(key); meta = store.getObjectMetadata(key);
} }
if (meta == null) { if (meta == null) {
ObjectListing listing = store.listObjects(key, 1, null, false); OSSListRequest listRequest = store.createListObjectsRequest(key,
maxKeys, null, null, false);
OSSListResult listing = store.listObjects(listRequest);
do { do {
if (CollectionUtils.isNotEmpty(listing.getObjectSummaries()) || if (CollectionUtils.isNotEmpty(listing.getObjectSummaries()) ||
CollectionUtils.isNotEmpty(listing.getCommonPrefixes())) { CollectionUtils.isNotEmpty(listing.getCommonPrefixes())) {
return new OSSFileStatus(0, true, 1, 0, 0, qualifiedPath, username); return new OSSFileStatus(0, true, 1, 0, 0, qualifiedPath, username);
} else if (listing.isTruncated()) { } else if (listing.isTruncated()) {
listing = store.listObjects(key, 1000, listing.getNextMarker(), listing = store.continueListObjects(listRequest, listing);
false);
} else { } else {
throw new FileNotFoundException( throw new FileNotFoundException(
path + ": No such file or directory!"); path + ": No such file or directory!");
@ -416,7 +416,9 @@ public class AliyunOSSFileSystem extends FileSystem {
LOG.debug("listStatus: doing listObjects for directory " + key); LOG.debug("listStatus: doing listObjects for directory " + key);
} }
ObjectListing objects = store.listObjects(key, maxKeys, null, false); OSSListRequest listRequest = store.createListObjectsRequest(key,
maxKeys, null, null, false);
OSSListResult objects = store.listObjects(listRequest);
while (true) { while (true) {
for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) { for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
String objKey = objectSummary.getKey(); String objKey = objectSummary.getKey();
@ -456,8 +458,7 @@ public class AliyunOSSFileSystem extends FileSystem {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("listStatus: list truncated - getting next batch"); LOG.debug("listStatus: list truncated - getting next batch");
} }
String nextMarker = objects.getNextMarker(); objects = store.continueListObjects(listRequest, objects);
objects = store.listObjects(key, maxKeys, nextMarker, false);
} else { } else {
break; break;
} }
@ -520,7 +521,7 @@ public class AliyunOSSFileSystem extends FileSystem {
locations); locations);
} else { } else {
return store.createLocatedFileStatusIterator(key, maxKeys, this, filter, return store.createLocatedFileStatusIterator(key, maxKeys, this, filter,
acceptor, recursive ? null : "/"); acceptor, recursive);
} }
} }
@ -707,7 +708,9 @@ public class AliyunOSSFileSystem extends FileSystem {
ExecutorService executorService = MoreExecutors.listeningDecorator( ExecutorService executorService = MoreExecutors.listeningDecorator(
new SemaphoredDelegatingExecutor(boundedCopyThreadPool, new SemaphoredDelegatingExecutor(boundedCopyThreadPool,
maxConcurrentCopyTasksPerDir, true)); maxConcurrentCopyTasksPerDir, true));
ObjectListing objects = store.listObjects(srcKey, maxKeys, null, true); OSSListRequest listRequest = store.createListObjectsRequest(srcKey,
maxKeys, null, null, true);
OSSListResult objects = store.listObjects(listRequest);
// Copy files from src folder to dst // Copy files from src folder to dst
int copiesToFinish = 0; int copiesToFinish = 0;
while (true) { while (true) {
@ -729,8 +732,7 @@ public class AliyunOSSFileSystem extends FileSystem {
} }
} }
if (objects.isTruncated()) { if (objects.isTruncated()) {
String nextMarker = objects.getNextMarker(); objects = store.continueListObjects(listRequest, objects);
objects = store.listObjects(srcKey, maxKeys, nextMarker, true);
} else { } else {
break; break;
} }

View File

@ -36,8 +36,8 @@ import com.aliyun.oss.model.GetObjectRequest;
import com.aliyun.oss.model.InitiateMultipartUploadRequest; import com.aliyun.oss.model.InitiateMultipartUploadRequest;
import com.aliyun.oss.model.InitiateMultipartUploadResult; import com.aliyun.oss.model.InitiateMultipartUploadResult;
import com.aliyun.oss.model.ListObjectsRequest; import com.aliyun.oss.model.ListObjectsRequest;
import com.aliyun.oss.model.ListObjectsV2Request;
import com.aliyun.oss.model.ObjectMetadata; import com.aliyun.oss.model.ObjectMetadata;
import com.aliyun.oss.model.ObjectListing;
import com.aliyun.oss.model.OSSObjectSummary; import com.aliyun.oss.model.OSSObjectSummary;
import com.aliyun.oss.model.PartETag; import com.aliyun.oss.model.PartETag;
import com.aliyun.oss.model.PutObjectResult; import com.aliyun.oss.model.PutObjectResult;
@ -90,6 +90,7 @@ public class AliyunOSSFileSystemStore {
private long uploadPartSize; private long uploadPartSize;
private int maxKeys; private int maxKeys;
private String serverSideEncryptionAlgorithm; private String serverSideEncryptionAlgorithm;
private boolean useListV1;
public void initialize(URI uri, Configuration conf, String user, public void initialize(URI uri, Configuration conf, String user,
FileSystem.Statistics stat) throws IOException { FileSystem.Statistics stat) throws IOException {
@ -170,6 +171,12 @@ public class AliyunOSSFileSystemStore {
} }
maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT); maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
int listVersion = conf.getInt(LIST_VERSION, DEFAULT_LIST_VERSION);
if (listVersion < 1 || listVersion > 2) {
LOG.warn("Configured fs.oss.list.version {} is invalid, forcing " +
"version 2", listVersion);
}
useListV1 = (listVersion == 1);
} }
/** /**
@ -231,14 +238,10 @@ public class AliyunOSSFileSystemStore {
* @throws IOException if failed to delete directory. * @throws IOException if failed to delete directory.
*/ */
public void deleteDirs(String key) throws IOException { public void deleteDirs(String key) throws IOException {
key = AliyunOSSUtils.maybeAddTrailingSlash(key); OSSListRequest listRequest = createListObjectsRequest(key,
ListObjectsRequest listRequest = new ListObjectsRequest(bucketName); maxKeys, null, null, true);
listRequest.setPrefix(key);
listRequest.setDelimiter(null);
listRequest.setMaxKeys(maxKeys);
while (true) { while (true) {
ObjectListing objects = ossClient.listObjects(listRequest); OSSListResult objects = listObjects(listRequest);
statistics.incrementReadOps(1); statistics.incrementReadOps(1);
List<String> keysToDelete = new ArrayList<String>(); List<String> keysToDelete = new ArrayList<String>();
for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) { for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
@ -246,7 +249,12 @@ public class AliyunOSSFileSystemStore {
} }
deleteObjects(keysToDelete); deleteObjects(keysToDelete);
if (objects.isTruncated()) { if (objects.isTruncated()) {
listRequest.setMarker(objects.getNextMarker()); if (objects.isV1()) {
listRequest.getV1().setMarker(objects.getV1().getNextMarker());
} else {
listRequest.getV2().setContinuationToken(
objects.getV2().getNextContinuationToken());
}
} else { } else {
break; break;
} }
@ -418,25 +426,76 @@ public class AliyunOSSFileSystemStore {
/** /**
* list objects. * list objects.
* *
* @param listRequest list request.
* @return a list of matches.
*/
public OSSListResult listObjects(OSSListRequest listRequest) {
OSSListResult listResult;
if (listRequest.isV1()) {
listResult = OSSListResult.v1(
ossClient.listObjects(listRequest.getV1()));
} else {
listResult = OSSListResult.v2(
ossClient.listObjectsV2(listRequest.getV2()));
}
statistics.incrementReadOps(1);
return listResult;
}
/**
* continue to list objects depends on previous list result.
*
* @param listRequest list request.
* @param preListResult previous list result.
* @return a list of matches.
*/
public OSSListResult continueListObjects(OSSListRequest listRequest,
OSSListResult preListResult) {
OSSListResult listResult;
if (listRequest.isV1()) {
listRequest.getV1().setMarker(preListResult.getV1().getNextMarker());
listResult = OSSListResult.v1(
ossClient.listObjects(listRequest.getV1()));
} else {
listRequest.getV2().setContinuationToken(
preListResult.getV2().getNextContinuationToken());
listResult = OSSListResult.v2(
ossClient.listObjectsV2(listRequest.getV2()));
}
statistics.incrementReadOps(1);
return listResult;
}
/**
* create list objects request.
*
* @param prefix prefix. * @param prefix prefix.
* @param maxListingLength max no. of entries * @param maxListingLength max no. of entries
* @param marker last key in any previous search. * @param marker last key in any previous search.
* @param continuationToken list from a specific point.
* @param recursive whether to list directory recursively. * @param recursive whether to list directory recursively.
* @return a list of matches. * @return a list of matches.
*/ */
public ObjectListing listObjects(String prefix, int maxListingLength, protected OSSListRequest createListObjectsRequest(String prefix,
String marker, boolean recursive) { int maxListingLength, String marker,
String continuationToken, boolean recursive) {
String delimiter = recursive ? null : "/"; String delimiter = recursive ? null : "/";
prefix = AliyunOSSUtils.maybeAddTrailingSlash(prefix); prefix = AliyunOSSUtils.maybeAddTrailingSlash(prefix);
if (useListV1) {
ListObjectsRequest listRequest = new ListObjectsRequest(bucketName); ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
listRequest.setPrefix(prefix); listRequest.setPrefix(prefix);
listRequest.setDelimiter(delimiter); listRequest.setDelimiter(delimiter);
listRequest.setMaxKeys(maxListingLength); listRequest.setMaxKeys(maxListingLength);
listRequest.setMarker(marker); listRequest.setMarker(marker);
return OSSListRequest.v1(listRequest);
ObjectListing listing = ossClient.listObjects(listRequest); } else {
statistics.incrementReadOps(1); ListObjectsV2Request listV2Request = new ListObjectsV2Request(bucketName);
return listing; listV2Request.setPrefix(prefix);
listV2Request.setDelimiter(delimiter);
listV2Request.setMaxKeys(maxListingLength);
listV2Request.setContinuationToken(continuationToken);
return OSSListRequest.v2(listV2Request);
}
} }
/** /**
@ -478,21 +537,7 @@ public class AliyunOSSFileSystemStore {
* @throws IOException if failed to clean up objects. * @throws IOException if failed to clean up objects.
*/ */
public void purge(String prefix) throws IOException { public void purge(String prefix) throws IOException {
String key; deleteDirs(prefix);
try {
ObjectListing objects = listObjects(prefix, maxKeys, null, true);
for (OSSObjectSummary object : objects.getObjectSummaries()) {
key = object.getKey();
ossClient.deleteObject(bucketName, key);
statistics.incrementWriteOps(1);
}
for (String dir: objects.getCommonPrefixes()) {
deleteDirs(dir);
}
} catch (OSSException | ClientException e) {
LOG.error("Failed to purge " + prefix);
}
} }
public RemoteIterator<LocatedFileStatus> singleStatusRemoteIterator( public RemoteIterator<LocatedFileStatus> singleStatusRemoteIterator(
@ -520,12 +565,12 @@ public class AliyunOSSFileSystemStore {
public RemoteIterator<LocatedFileStatus> createLocatedFileStatusIterator( public RemoteIterator<LocatedFileStatus> createLocatedFileStatusIterator(
final String prefix, final int maxListingLength, FileSystem fs, final String prefix, final int maxListingLength, FileSystem fs,
PathFilter filter, FileStatusAcceptor acceptor, String delimiter) { PathFilter filter, FileStatusAcceptor acceptor, boolean recursive) {
return new RemoteIterator<LocatedFileStatus>() { return new RemoteIterator<LocatedFileStatus>() {
private String nextMarker = null;
private boolean firstListing = true; private boolean firstListing = true;
private boolean meetEnd = false; private boolean meetEnd = false;
private ListIterator<FileStatus> batchIterator; private ListIterator<FileStatus> batchIterator;
private OSSListRequest listRequest = null;
@Override @Override
public boolean hasNext() throws IOException { public boolean hasNext() throws IOException {
@ -550,15 +595,24 @@ public class AliyunOSSFileSystemStore {
} }
private boolean requestNextBatch() { private boolean requestNextBatch() {
while (!meetEnd) {
if (continueListStatus()) {
return true;
}
}
return false;
}
private boolean continueListStatus() {
if (meetEnd) { if (meetEnd) {
return false; return false;
} }
ListObjectsRequest listRequest = new ListObjectsRequest(bucketName); if (listRequest == null) {
listRequest.setPrefix(AliyunOSSUtils.maybeAddTrailingSlash(prefix)); listRequest = createListObjectsRequest(prefix,
listRequest.setMaxKeys(maxListingLength); maxListingLength, null, null, recursive);
listRequest.setMarker(nextMarker); }
listRequest.setDelimiter(delimiter); OSSListResult listing = listObjects(listRequest);
ObjectListing listing = ossClient.listObjects(listRequest);
List<FileStatus> stats = new ArrayList<>( List<FileStatus> stats = new ArrayList<>(
listing.getObjectSummaries().size() + listing.getObjectSummaries().size() +
listing.getCommonPrefixes().size()); listing.getCommonPrefixes().size());
@ -584,7 +638,12 @@ public class AliyunOSSFileSystemStore {
batchIterator = stats.listIterator(); batchIterator = stats.listIterator();
if (listing.isTruncated()) { if (listing.isTruncated()) {
nextMarker = listing.getNextMarker(); if (listing.isV1()) {
listRequest.getV1().setMarker(listing.getV1().getNextMarker());
} else {
listRequest.getV2().setContinuationToken(
listing.getV2().getNextContinuationToken());
}
} else { } else {
meetEnd = true; meetEnd = true;
} }

View File

@ -154,4 +154,8 @@ public final class Constants {
public static final String UPLOAD_ACTIVE_BLOCKS_KEY = public static final String UPLOAD_ACTIVE_BLOCKS_KEY =
"fs.oss.upload.active.blocks"; "fs.oss.upload.active.blocks";
public static final int UPLOAD_ACTIVE_BLOCKS_DEFAULT = 4; public static final int UPLOAD_ACTIVE_BLOCKS_DEFAULT = 4;
public static final String LIST_VERSION = "fs.oss.list.version";
public static final int DEFAULT_LIST_VERSION = 2;
} }

View File

@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import com.aliyun.oss.model.ListObjectsRequest;
import com.aliyun.oss.model.ListObjectsV2Request;
/**
* API version-independent container for OSS List requests.
*/
public class OSSListRequest {
/**
* Format for the toString() method: {@value}.
*/
private static final String DESCRIPTION
= "List %s:/%s delimiter=%s keys=%d";
private final ListObjectsRequest v1Request;
private final ListObjectsV2Request v2Request;
private OSSListRequest(ListObjectsRequest v1, ListObjectsV2Request v2) {
v1Request = v1;
v2Request = v2;
}
/**
* Restricted constructors to ensure v1 or v2, not both.
* @param request v1 request
* @return new list request container
*/
public static OSSListRequest v1(ListObjectsRequest request) {
return new OSSListRequest(request, null);
}
/**
* Restricted constructors to ensure v1 or v2, not both.
* @param request v2 request
* @return new list request container
*/
public static OSSListRequest v2(ListObjectsV2Request request) {
return new OSSListRequest(null, request);
}
/**
* Is this a v1 API request or v2?
* @return true if v1, false if v2
*/
public boolean isV1() {
return v1Request != null;
}
public ListObjectsRequest getV1() {
return v1Request;
}
public ListObjectsV2Request getV2() {
return v2Request;
}
@Override
public String toString() {
if (isV1()) {
return String.format(DESCRIPTION,
v1Request.getBucketName(), v1Request.getPrefix(),
v1Request.getDelimiter(), v1Request.getMaxKeys());
} else {
return String.format(DESCRIPTION,
v2Request.getBucketName(), v2Request.getPrefix(),
v2Request.getDelimiter(), v2Request.getMaxKeys());
}
}
}

View File

@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import com.aliyun.oss.model.ListObjectsV2Result;
import com.aliyun.oss.model.OSSObjectSummary;
import com.aliyun.oss.model.ObjectListing;
import org.slf4j.Logger;
import java.util.Collection;
import java.util.List;
/**
* API version-independent container for OSS List responses.
*/
public final class OSSListResult {
private ObjectListing v1Result;
private ListObjectsV2Result v2Result;
protected OSSListResult(ObjectListing v1, ListObjectsV2Result v2) {
v1Result = v1;
v2Result = v2;
}
/**
* Restricted constructors to ensure v1 or v2, not both.
* @param result v1 result
* @return new list result container
*/
public static OSSListResult v1(ObjectListing result) {
return new OSSListResult(result, null);
}
/**
* Restricted constructors to ensure v1 or v2, not both.
* @param result v2 result
* @return new list result container
*/
public static OSSListResult v2(ListObjectsV2Result result) {
return new OSSListResult(null, result);
}
/**
* Is this a v1 API result or v2?
* @return true if v1, false if v2
*/
public boolean isV1() {
return v1Result != null;
}
public ObjectListing getV1() {
return v1Result;
}
public ListObjectsV2Result getV2() {
return v2Result;
}
public List<OSSObjectSummary> getObjectSummaries() {
if (isV1()) {
return v1Result.getObjectSummaries();
} else {
return v2Result.getObjectSummaries();
}
}
public boolean isTruncated() {
if (isV1()) {
return v1Result.isTruncated();
} else {
return v2Result.isTruncated();
}
}
public List<String> getCommonPrefixes() {
if (isV1()) {
return v1Result.getCommonPrefixes();
} else {
return v2Result.getCommonPrefixes();
}
}
/**
* Dump the result at debug level.
* @param log log to use
*/
public void logAtDebug(Logger log) {
Collection<String> prefixes = getCommonPrefixes();
Collection<OSSObjectSummary> summaries = getObjectSummaries();
log.debug("Prefix count = {}; object count={}",
prefixes.size(), summaries.size());
for (OSSObjectSummary summary : summaries) {
log.debug("Summary: {} {}", summary.getKey(), summary.getSize());
}
for (String prefix : prefixes) {
log.debug("Prefix: {}", prefix);
}
}
}

View File

@ -243,6 +243,14 @@ please raise your issues with them.
<description>Size in bytes in each request from ALiyun OSS.</description> <description>Size in bytes in each request from ALiyun OSS.</description>
</property> </property>
<property>
<name>fs.oss.list.version</name>
<value>2</value>
<description>Select which version of the OSS SDK's List Objects API to use.
Currently support 2(default) and 1(older API).
</description>
</property>
<property> <property>
<name>fs.oss.buffer.dir</name> <name>fs.oss.buffer.dir</name>
<description>Comma separated list of directories to buffer OSS data before uploading to Aliyun OSS</description> <description>Comma separated list of directories to buffer OSS data before uploading to Aliyun OSS</description>

View File

@ -83,4 +83,14 @@ public final class AliyunOSSTestUtils {
return testUniqueForkId == null ? "/test" : return testUniqueForkId == null ? "/test" :
"/" + testUniqueForkId + "/test"; "/" + testUniqueForkId + "/test";
} }
/**
* Turn off FS Caching: use if a filesystem with different options from
* the default is required.
* @param conf configuration to patch
*/
public static void disableFilesystemCaching(Configuration conf) {
conf.setBoolean(TestAliyunOSSFileSystemContract.FS_OSS_IMPL_DISABLE_CACHE,
true);
}
} }

View File

@ -43,6 +43,7 @@ import static org.junit.Assert.assertEquals;
*/ */
public class TestAliyunOSSBlockOutputStream { public class TestAliyunOSSBlockOutputStream {
private FileSystem fs; private FileSystem fs;
private static final int PART_SIZE = 1024 * 1024;
private static String testRootPath = private static String testRootPath =
AliyunOSSTestUtils.generateUniqueTestPath(); AliyunOSSTestUtils.generateUniqueTestPath();
@ -52,7 +53,7 @@ public class TestAliyunOSSBlockOutputStream {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setInt(MULTIPART_UPLOAD_PART_SIZE_KEY, 1024 * 1024); conf.setInt(MULTIPART_UPLOAD_PART_SIZE_KEY, PART_SIZE);
conf.setInt(IO_CHUNK_BUFFER_SIZE, conf.setInt(IO_CHUNK_BUFFER_SIZE,
conf.getInt(MULTIPART_UPLOAD_PART_SIZE_KEY, 0)); conf.getInt(MULTIPART_UPLOAD_PART_SIZE_KEY, 0));
conf.setInt(Constants.UPLOAD_ACTIVE_BLOCKS_KEY, 20); conf.setInt(Constants.UPLOAD_ACTIVE_BLOCKS_KEY, 20);
@ -155,10 +156,8 @@ public class TestAliyunOSSBlockOutputStream {
@Test @Test
public void testHugeUpload() throws IOException { public void testHugeUpload() throws IOException {
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), ContractTestUtils.createAndVerifyFile(fs, getTestPath(), PART_SIZE - 1);
MULTIPART_UPLOAD_PART_SIZE_DEFAULT - 1); ContractTestUtils.createAndVerifyFile(fs, getTestPath(), PART_SIZE);
ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
MULTIPART_UPLOAD_PART_SIZE_DEFAULT);
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
MULTIPART_UPLOAD_PART_SIZE_DEFAULT + 1); MULTIPART_UPLOAD_PART_SIZE_DEFAULT + 1);
bufferDirShouldEmpty(); bufferDirShouldEmpty();

View File

@ -47,6 +47,8 @@ import static org.junit.Assume.assumeTrue;
public class TestAliyunOSSFileSystemContract public class TestAliyunOSSFileSystemContract
extends FileSystemContractBaseTest { extends FileSystemContractBaseTest {
public static final String TEST_FS_OSS_NAME = "test.fs.oss.name"; public static final String TEST_FS_OSS_NAME = "test.fs.oss.name";
public static final String FS_OSS_IMPL_DISABLE_CACHE
= "fs.oss.impl.disable.cache";
private static Path testRootPath = private static Path testRootPath =
new Path(AliyunOSSTestUtils.generateUniqueTestPath()); new Path(AliyunOSSTestUtils.generateUniqueTestPath());
@ -413,7 +415,7 @@ public class TestAliyunOSSFileSystemContract
Thread thread = new Thread(task); Thread thread = new Thread(task);
thread.start(); thread.start();
while (!task.isRunning()) { while (!task.isRunning()) {
Thread.sleep(1000); Thread.sleep(1);
} }
if (changing) { if (changing) {
@ -421,8 +423,12 @@ public class TestAliyunOSSFileSystemContract
} }
thread.join(); thread.join();
if (changing) {
assertTrue(task.isSucceed() || fs.exists(this.path("a")));
} else {
assertEquals(result, task.isSucceed()); assertEquals(result, task.isSucceed());
} }
}
class TestRenameTask implements Runnable { class TestRenameTask implements Runnable {
private FileSystem fs; private FileSystem fs;
@ -451,6 +457,8 @@ public class TestAliyunOSSFileSystemContract
running = true; running = true;
result = fs.rename(srcPath, dstPath); result = fs.rename(srcPath, dstPath);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace();
this.result = false;
} }
} }
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.fs.aliyun.oss; package org.apache.hadoop.fs.aliyun.oss;
import com.aliyun.oss.model.ObjectMetadata;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.junit.After; import org.junit.After;
@ -89,13 +90,17 @@ public class TestAliyunOSSFileSystemStore {
assertTrue("Exists", fs.exists(path)); assertTrue("Exists", fs.exists(path));
ObjectMetadata srcMeta = fs.getStore().getObjectMetadata(
path.toUri().getPath().substring(1));
Path copyPath = path.suffix(".copy"); Path copyPath = path.suffix(".copy");
long start = System.currentTimeMillis();
fs.rename(path, copyPath); fs.rename(path, copyPath);
assertTrue("Copy exists", fs.exists(copyPath)); assertTrue("Copy exists", fs.exists(copyPath));
// should less than 1 second // file type should not change
assertTrue(System.currentTimeMillis() - start < 1000); ObjectMetadata dstMeta = fs.getStore().getObjectMetadata(
copyPath.toUri().getPath().substring(1));
assertEquals(srcMeta.getObjectType(), dstMeta.getObjectType());
// Download file from Aliyun OSS and compare the digest against the original // Download file from Aliyun OSS and compare the digest against the original
MessageDigest digest2 = MessageDigest.getInstance("MD5"); MessageDigest digest2 = MessageDigest.getInstance("MD5");
InputStream in = new BufferedInputStream( InputStream in = new BufferedInputStream(
@ -120,6 +125,7 @@ public class TestAliyunOSSFileSystemStore {
public void testLargeUpload() public void testLargeUpload()
throws IOException, NoSuchAlgorithmException { throws IOException, NoSuchAlgorithmException {
// Multipart upload, shallow copy // Multipart upload, shallow copy
writeRenameReadCompare(new Path("/test/xlarge"), 2147483648L); // 2GB writeRenameReadCompare(new Path("/test/xlarge"),
Constants.MULTIPART_UPLOAD_PART_SIZE_DEFAULT + 1);
} }
} }

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss.contract;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSTestUtils;
import org.apache.hadoop.fs.aliyun.oss.Constants;
import org.apache.hadoop.fs.contract.AbstractContractGetFileStatusTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
/**
* Test getFileStatus and related listing operations,
* using the v1 List Objects API.
*/
public class TestAliyunOSSContractGetFileStatusV1List
extends AbstractContractGetFileStatusTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new AliyunOSSContract(conf);
}
@Override
public void teardown() throws Exception {
getLogger().info("FS details {}", getFileSystem());
super.teardown();
}
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
AliyunOSSTestUtils.disableFilesystemCaching(conf);
conf.setInt(Constants.MAX_PAGING_KEYS_KEY, 2);
// Use v1 List Objects API
conf.setInt(Constants.LIST_VERSION, 1);
return conf;
}
}

View File

@ -95,7 +95,7 @@
<property> <property>
<name>fs.contract.rename-overwrites-dest</name> <name>fs.contract.rename-overwrites-dest</name>
<value>true</value> <value>false</value>
</property> </property>
<property> <property>