HADOOP-14993. AliyunOSS: Override listFiles and listLocatedStatus. Contributed Genmao Yu.
(cherry picked from commit fb809e05dc
)
This commit is contained in:
parent
e309d25d2b
commit
1e0f7a1631
|
@ -28,14 +28,18 @@ import java.util.List;
|
||||||
import org.apache.commons.collections.CollectionUtils;
|
import org.apache.commons.collections.CollectionUtils;
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.BlockLocation;
|
||||||
import org.apache.hadoop.fs.CreateFlag;
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.PathFilter;
|
||||||
import org.apache.hadoop.fs.PathIOException;
|
import org.apache.hadoop.fs.PathIOException;
|
||||||
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
|
|
||||||
|
@ -46,6 +50,7 @@ import com.aliyun.oss.model.ObjectMetadata;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.aliyun.oss.AliyunOSSUtils.objectRepresentsDirectory;
|
||||||
import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
|
import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -60,6 +65,12 @@ public class AliyunOSSFileSystem extends FileSystem {
|
||||||
private Path workingDir;
|
private Path workingDir;
|
||||||
private AliyunOSSFileSystemStore store;
|
private AliyunOSSFileSystemStore store;
|
||||||
private int maxKeys;
|
private int maxKeys;
|
||||||
|
private static final PathFilter DEFAULT_FILTER = new PathFilter() {
|
||||||
|
@Override
|
||||||
|
public boolean accept(Path file) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public FSDataOutputStream append(Path path, int bufferSize,
|
public FSDataOutputStream append(Path path, int bufferSize,
|
||||||
|
@ -301,18 +312,6 @@ public class AliyunOSSFileSystem extends FileSystem {
|
||||||
setConf(conf);
|
setConf(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Check if OSS object represents a directory.
|
|
||||||
*
|
|
||||||
* @param name object key
|
|
||||||
* @param size object content length
|
|
||||||
* @return true if object represents a directory
|
|
||||||
*/
|
|
||||||
private boolean objectRepresentsDirectory(final String name,
|
|
||||||
final long size) {
|
|
||||||
return StringUtils.isNotEmpty(name) && name.endsWith("/") && size == 0L;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Turn a path (relative or otherwise) into an OSS key.
|
* Turn a path (relative or otherwise) into an OSS key.
|
||||||
*
|
*
|
||||||
|
@ -404,6 +403,58 @@ public class AliyunOSSFileSystem extends FileSystem {
|
||||||
return result.toArray(new FileStatus[result.size()]);
|
return result.toArray(new FileStatus[result.size()]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RemoteIterator<LocatedFileStatus> listFiles(
|
||||||
|
final Path f, final boolean recursive) throws IOException {
|
||||||
|
Path qualifiedPath = f.makeQualified(uri, workingDir);
|
||||||
|
final FileStatus status = getFileStatus(qualifiedPath);
|
||||||
|
PathFilter filter = new PathFilter() {
|
||||||
|
@Override
|
||||||
|
public boolean accept(Path path) {
|
||||||
|
return status.isFile() || !path.equals(f);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
FileStatusAcceptor acceptor =
|
||||||
|
new FileStatusAcceptor.AcceptFilesOnly(qualifiedPath);
|
||||||
|
return innerList(f, status, filter, acceptor, recursive);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
|
||||||
|
throws IOException {
|
||||||
|
return listLocatedStatus(f, DEFAULT_FILTER);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
|
||||||
|
final PathFilter filter) throws IOException {
|
||||||
|
Path qualifiedPath = f.makeQualified(uri, workingDir);
|
||||||
|
final FileStatus status = getFileStatus(qualifiedPath);
|
||||||
|
FileStatusAcceptor acceptor =
|
||||||
|
new FileStatusAcceptor.AcceptAllButSelf(qualifiedPath);
|
||||||
|
return innerList(f, status, filter, acceptor, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private RemoteIterator<LocatedFileStatus> innerList(final Path f,
|
||||||
|
final FileStatus status,
|
||||||
|
final PathFilter filter,
|
||||||
|
final FileStatusAcceptor acceptor,
|
||||||
|
final boolean recursive) throws IOException {
|
||||||
|
Path qualifiedPath = f.makeQualified(uri, workingDir);
|
||||||
|
String key = pathToKey(qualifiedPath);
|
||||||
|
|
||||||
|
if (status.isFile()) {
|
||||||
|
LOG.debug("{} is a File", qualifiedPath);
|
||||||
|
final BlockLocation[] locations = getFileBlockLocations(status,
|
||||||
|
0, status.getLen());
|
||||||
|
return store.singleStatusRemoteIterator(filter.accept(f) ? status : null,
|
||||||
|
locations);
|
||||||
|
} else {
|
||||||
|
return store.createLocatedFileStatusIterator(key, maxKeys, this, filter,
|
||||||
|
acceptor, recursive ? null : "/");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used to create an empty file that represents an empty directory.
|
* Used to create an empty file that represents an empty directory.
|
||||||
*
|
*
|
||||||
|
|
|
@ -46,7 +46,13 @@ import com.aliyun.oss.model.UploadPartResult;
|
||||||
import org.apache.commons.collections.CollectionUtils;
|
import org.apache.commons.collections.CollectionUtils;
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.BlockLocation;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.PathFilter;
|
||||||
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -58,6 +64,8 @@ import java.io.InputStream;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.ListIterator;
|
||||||
|
import java.util.NoSuchElementException;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
|
import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
|
||||||
|
|
||||||
|
@ -546,4 +554,102 @@ public class AliyunOSSFileSystemStore {
|
||||||
LOG.error("Failed to purge " + prefix);
|
LOG.error("Failed to purge " + prefix);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public RemoteIterator<LocatedFileStatus> singleStatusRemoteIterator(
|
||||||
|
final FileStatus fileStatus, final BlockLocation[] locations) {
|
||||||
|
return new RemoteIterator<LocatedFileStatus>() {
|
||||||
|
private boolean hasNext = true;
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() throws IOException {
|
||||||
|
return fileStatus != null && hasNext;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LocatedFileStatus next() throws IOException {
|
||||||
|
if (hasNext()) {
|
||||||
|
LocatedFileStatus s = new LocatedFileStatus(fileStatus,
|
||||||
|
fileStatus.isFile() ? locations : null);
|
||||||
|
hasNext = false;
|
||||||
|
return s;
|
||||||
|
} else {
|
||||||
|
throw new NoSuchElementException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
public RemoteIterator<LocatedFileStatus> createLocatedFileStatusIterator(
|
||||||
|
final String prefix, final int maxListingLength, FileSystem fs,
|
||||||
|
PathFilter filter, FileStatusAcceptor acceptor, String delimiter) {
|
||||||
|
return new RemoteIterator<LocatedFileStatus>() {
|
||||||
|
private String nextMarker = null;
|
||||||
|
private boolean firstListing = true;
|
||||||
|
private boolean meetEnd = false;
|
||||||
|
private ListIterator<FileStatus> batchIterator;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() throws IOException {
|
||||||
|
if (firstListing) {
|
||||||
|
requestNextBatch();
|
||||||
|
firstListing = false;
|
||||||
|
}
|
||||||
|
return batchIterator.hasNext() || requestNextBatch();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LocatedFileStatus next() throws IOException {
|
||||||
|
if (hasNext()) {
|
||||||
|
FileStatus status = batchIterator.next();
|
||||||
|
BlockLocation[] locations = fs.getFileBlockLocations(status,
|
||||||
|
0, status.getLen());
|
||||||
|
return new LocatedFileStatus(
|
||||||
|
status, status.isFile() ? locations : null);
|
||||||
|
} else {
|
||||||
|
throw new NoSuchElementException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean requestNextBatch() {
|
||||||
|
if (meetEnd) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
|
||||||
|
listRequest.setPrefix(AliyunOSSUtils.maybeAddTrailingSlash(prefix));
|
||||||
|
listRequest.setMaxKeys(maxListingLength);
|
||||||
|
listRequest.setMarker(nextMarker);
|
||||||
|
listRequest.setDelimiter(delimiter);
|
||||||
|
ObjectListing listing = ossClient.listObjects(listRequest);
|
||||||
|
List<FileStatus> stats = new ArrayList<>(
|
||||||
|
listing.getObjectSummaries().size() +
|
||||||
|
listing.getCommonPrefixes().size());
|
||||||
|
for(OSSObjectSummary summary: listing.getObjectSummaries()) {
|
||||||
|
String key = summary.getKey();
|
||||||
|
Path path = fs.makeQualified(new Path("/" + key));
|
||||||
|
if (filter.accept(path) && acceptor.accept(path, summary)) {
|
||||||
|
FileStatus status = new FileStatus(summary.getSize(),
|
||||||
|
key.endsWith("/"), 1, fs.getDefaultBlockSize(path),
|
||||||
|
summary.getLastModified().getTime(), path);
|
||||||
|
stats.add(status);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for(String commonPrefix: listing.getCommonPrefixes()) {
|
||||||
|
Path path = fs.makeQualified(new Path("/" + commonPrefix));
|
||||||
|
if (filter.accept(path) && acceptor.accept(path, commonPrefix)) {
|
||||||
|
FileStatus status = new FileStatus(0, true, 1, 0, 0, path);
|
||||||
|
stats.add(status);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
batchIterator = stats.listIterator();
|
||||||
|
if (listing.isTruncated()) {
|
||||||
|
nextMarker = listing.getNextMarker();
|
||||||
|
} else {
|
||||||
|
meetEnd = true;
|
||||||
|
}
|
||||||
|
statistics.incrementReadOps(1);
|
||||||
|
return batchIterator.hasNext();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -164,4 +164,16 @@ final public class AliyunOSSUtils {
|
||||||
return key;
|
return key;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if OSS object represents a directory.
|
||||||
|
*
|
||||||
|
* @param name object key
|
||||||
|
* @param size object content length
|
||||||
|
* @return true if object represents a directory
|
||||||
|
*/
|
||||||
|
public static boolean objectRepresentsDirectory(final String name,
|
||||||
|
final long size) {
|
||||||
|
return StringUtils.isNotEmpty(name) && name.endsWith("/") && size == 0L;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,125 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.aliyun.oss;
|
||||||
|
|
||||||
|
import com.aliyun.oss.model.OSSObjectSummary;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.aliyun.oss.AliyunOSSUtils.objectRepresentsDirectory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Interface to implement by the logic deciding whether to accept a summary
|
||||||
|
* entry or path as a valid file or directory.
|
||||||
|
*/
|
||||||
|
public interface FileStatusAcceptor {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Predicate to decide whether or not to accept a summary entry.
|
||||||
|
* @param keyPath qualified path to the entry
|
||||||
|
* @param summary summary entry
|
||||||
|
* @return true if the entry is accepted (i.e. that a status entry
|
||||||
|
* should be generated.
|
||||||
|
*/
|
||||||
|
boolean accept(Path keyPath, OSSObjectSummary summary);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Predicate to decide whether or not to accept a prefix.
|
||||||
|
* @param keyPath qualified path to the entry
|
||||||
|
* @param commonPrefix the prefix
|
||||||
|
* @return true if the entry is accepted (i.e. that a status entry
|
||||||
|
* should be generated.)
|
||||||
|
*/
|
||||||
|
boolean accept(Path keyPath, String commonPrefix);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Accept all entries except the base path.
|
||||||
|
*/
|
||||||
|
class AcceptFilesOnly implements FileStatusAcceptor {
|
||||||
|
private final Path qualifiedPath;
|
||||||
|
|
||||||
|
public AcceptFilesOnly(Path qualifiedPath) {
|
||||||
|
this.qualifiedPath = qualifiedPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reject a summary entry if the key path is the qualified Path.
|
||||||
|
* @param keyPath key path of the entry
|
||||||
|
* @param summary summary entry
|
||||||
|
* @return true if the entry is accepted (i.e. that a status entry
|
||||||
|
* should be generated.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean accept(Path keyPath, OSSObjectSummary summary) {
|
||||||
|
return !keyPath.equals(qualifiedPath)
|
||||||
|
&& !objectRepresentsDirectory(summary.getKey(), summary.getSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Accept no directory paths.
|
||||||
|
* @param keyPath qualified path to the entry
|
||||||
|
* @param prefix common prefix in listing.
|
||||||
|
* @return false, always.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean accept(Path keyPath, String prefix) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Accept all entries except the base path.
|
||||||
|
*/
|
||||||
|
class AcceptAllButSelf implements FileStatusAcceptor {
|
||||||
|
|
||||||
|
/** Base path. */
|
||||||
|
private final Path qualifiedPath;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor.
|
||||||
|
* @param qualifiedPath an already-qualified path.
|
||||||
|
*/
|
||||||
|
public AcceptAllButSelf(Path qualifiedPath) {
|
||||||
|
this.qualifiedPath = qualifiedPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reject a summary entry if the key path is the qualified Path.
|
||||||
|
* @param keyPath key path of the entry
|
||||||
|
* @param summary summary entry
|
||||||
|
* @return true if the entry is accepted (i.e. that a status entry
|
||||||
|
* should be generated.)
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean accept(Path keyPath, OSSObjectSummary summary) {
|
||||||
|
return !keyPath.equals(qualifiedPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Accept all prefixes except the one for the base path, "self".
|
||||||
|
* @param keyPath qualified path to the entry
|
||||||
|
* @param prefix common prefix in listing.
|
||||||
|
* @return true if the entry is accepted (i.e. that a status entry
|
||||||
|
* should be generated.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean accept(Path keyPath, String prefix) {
|
||||||
|
return !keyPath.equals(qualifiedPath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -56,14 +56,14 @@ Authorization occurs at the level of the entire Aliyun account via
|
||||||
4. The append operation is not supported.
|
4. The append operation is not supported.
|
||||||
|
|
||||||
### Warning #2: Directory last access time is not tracked,
|
### Warning #2: Directory last access time is not tracked,
|
||||||
features of Hadoop relying on this can have unexpected behaviour. E.g. the
|
Features of Hadoop relying on this can have unexpected behaviour. E.g. the
|
||||||
AggregatedLogDeletionService of YARN will not remove the appropriate logfiles.
|
AggregatedLogDeletionService of YARN will not remove the appropriate log files.
|
||||||
|
|
||||||
### Warning #3: Your Aliyun credentials are valuable
|
### Warning #3: Your Aliyun credentials are valuable
|
||||||
|
|
||||||
Your Aliyun credentials not only pay for services, they offer read and write
|
Your Aliyun credentials not only pay for services, they offer read and write
|
||||||
access to the data. Anyone with the account can not only read your datasets
|
access to the data. Anyone with the account can not only read your datasets
|
||||||
—they can delete them.
|
— they can delete them.
|
||||||
|
|
||||||
Do not inadvertently share these credentials through means such as
|
Do not inadvertently share these credentials through means such as
|
||||||
1. Checking in to SCM any configuration files containing the secrets.
|
1. Checking in to SCM any configuration files containing the secrets.
|
||||||
|
|
Loading…
Reference in New Issue