HADOOP-15661. ABFS: Add support for ACL.

Contributed by Junhua Gu and Da Zhou.
This commit is contained in:
Thomas Marquardt 2018-08-22 18:31:47 +00:00 committed by Yuan Gao
parent 9d47230c8f
commit e727c87500
19 changed files with 2422 additions and 58 deletions

View File

@ -26,6 +26,7 @@ import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.concurrent.Callable;
@ -60,6 +61,8 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.FileSystemOperationUnh
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriAuthorityException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
@ -154,7 +157,8 @@ public class AzureBlobFileSystem extends FileSystem {
blockSize);
try {
OutputStream outputStream = abfsStore.createFile(makeQualified(f), overwrite);
OutputStream outputStream = abfsStore.createFile(makeQualified(f), overwrite,
permission == null ? FsPermission.getFileDefault() : permission, FsPermission.getUMask(getConf()));
return new FSDataOutputStream(outputStream, statistics);
} catch(AzureBlobFileSystemException ex) {
checkException(f, ex);
@ -253,7 +257,8 @@ public class AzureBlobFileSystem extends FileSystem {
AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH,
AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND,
AzureServiceErrorCode.INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE,
AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND);
AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND,
AzureServiceErrorCode.INTERNAL_OPERATION_ABORT);
return false;
}
@ -308,7 +313,8 @@ public class AzureBlobFileSystem extends FileSystem {
}
try {
abfsStore.createDirectory(makeQualified(f));
abfsStore.createDirectory(makeQualified(f), permission == null ? FsPermission.getDirDefault() : permission,
FsPermission.getUMask(getConf()));
return true;
} catch (AzureBlobFileSystemException ex) {
checkException(f, ex, AzureServiceErrorCode.PATH_ALREADY_EXISTS);
@ -457,6 +463,188 @@ public class AzureBlobFileSystem extends FileSystem {
return true;
}
/**
* Set owner of a path (i.e. a file or a directory).
* The parameters owner and group cannot both be null.
*
* @param path The path
* @param owner If it is null, the original username remains unchanged.
* @param group If it is null, the original groupname remains unchanged.
*/
@Override
public void setOwner(final Path path, final String owner, final String group)
throws IOException {
LOG.debug(
"AzureBlobFileSystem.setOwner path: {}", path);
if ((owner == null || owner.isEmpty()) && (group == null || group.isEmpty())) {
throw new IllegalArgumentException("A valid owner or group must be specified.");
}
try {
abfsStore.setOwner(makeQualified(path),
owner,
group);
} catch (AzureBlobFileSystemException ex) {
checkException(path, ex);
}
}
/**
* Set permission of a path.
*
* @param path The path
* @param permission Access permission
*/
@Override
public void setPermission(final Path path, final FsPermission permission)
throws IOException {
LOG.debug("AzureBlobFileSystem.setPermission path: {}", path);
if (permission == null) {
throw new IllegalArgumentException("The permission can't be null");
}
try {
abfsStore.setPermission(makeQualified(path),
permission);
} catch (AzureBlobFileSystemException ex) {
checkException(path, ex);
}
}
/**
* Modifies ACL entries of files and directories. This method can add new ACL
* entries or modify the permissions on existing ACL entries. All existing
* ACL entries that are not specified in this call are retained without
* changes. (Modifications are merged into the current ACL.)
*
* @param path Path to modify
* @param aclSpec List of AbfsAclEntry describing modifications
* @throws IOException if an ACL could not be modified
*/
@Override
public void modifyAclEntries(final Path path, final List<AclEntry> aclSpec)
throws IOException {
LOG.debug("AzureBlobFileSystem.modifyAclEntries path: {}", path.toString());
if (aclSpec == null || aclSpec.isEmpty()) {
throw new IllegalArgumentException("The value of the aclSpec parameter is invalid.");
}
try {
abfsStore.modifyAclEntries(makeQualified(path),
aclSpec);
} catch (AzureBlobFileSystemException ex) {
checkException(path, ex);
}
}
/**
* Removes ACL entries from files and directories. Other ACL entries are
* retained.
*
* @param path Path to modify
* @param aclSpec List of AclEntry describing entries to remove
* @throws IOException if an ACL could not be modified
*/
@Override
public void removeAclEntries(final Path path, final List<AclEntry> aclSpec)
throws IOException {
LOG.debug("AzureBlobFileSystem.removeAclEntries path: {}", path);
if (aclSpec == null || aclSpec.isEmpty()) {
throw new IllegalArgumentException("The aclSpec argument is invalid.");
}
try {
abfsStore.removeAclEntries(makeQualified(path), aclSpec);
} catch (AzureBlobFileSystemException ex) {
checkException(path, ex);
}
}
/**
* Removes all default ACL entries from files and directories.
*
* @param path Path to modify
* @throws IOException if an ACL could not be modified
*/
@Override
public void removeDefaultAcl(final Path path) throws IOException {
LOG.debug("AzureBlobFileSystem.removeDefaultAcl path: {}", path);
try {
abfsStore.removeDefaultAcl(makeQualified(path));
} catch (AzureBlobFileSystemException ex) {
checkException(path, ex);
}
}
/**
* Removes all but the base ACL entries of files and directories. The entries
* for user, group, and others are retained for compatibility with permission
* bits.
*
* @param path Path to modify
* @throws IOException if an ACL could not be removed
*/
@Override
public void removeAcl(final Path path) throws IOException {
LOG.debug("AzureBlobFileSystem.removeAcl path: {}", path);
try {
abfsStore.removeAcl(makeQualified(path));
} catch (AzureBlobFileSystemException ex) {
checkException(path, ex);
}
}
/**
* Fully replaces ACL of files and directories, discarding all existing
* entries.
*
* @param path Path to modify
* @param aclSpec List of AclEntry describing modifications, must include
* entries for user, group, and others for compatibility with
* permission bits.
* @throws IOException if an ACL could not be modified
*/
@Override
public void setAcl(final Path path, final List<AclEntry> aclSpec)
throws IOException {
LOG.debug("AzureBlobFileSystem.setAcl path: {}", path);
if (aclSpec == null || aclSpec.size() == 0) {
throw new IllegalArgumentException("The aclSpec argument is invalid.");
}
try {
abfsStore.setAcl(makeQualified(path), aclSpec);
} catch (AzureBlobFileSystemException ex) {
checkException(path, ex);
}
}
/**
* Gets the ACL of a file or directory.
*
* @param path Path to get
* @return AbfsAclStatus describing the ACL of the file or directory
* @throws IOException if an ACL could not be read
*/
@Override
public AclStatus getAclStatus(final Path path) throws IOException {
LOG.debug("AzureBlobFileSystem.getAclStatus path: {}", path.toString());
try {
return abfsStore.getAclStatus(makeQualified(path));
} catch (AzureBlobFileSystemException ex) {
checkException(path, ex);
return null;
}
}
private FileStatus tryGetFileStatus(final Path f) {
try {
return getFileStatus(f);
@ -656,4 +844,9 @@ public class AzureBlobFileSystem extends FileSystem {
AbfsClient getAbfsClient() {
return abfsStore.getClient();
}
@VisibleForTesting
boolean getIsNamespaceEnabeld() throws AzureBlobFileSystemException {
return abfsStore.getIsNamespaceEnabled();
}
}

View File

@ -36,8 +36,10 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.xml.bind.DatatypeConverter;
@ -68,13 +70,18 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema;
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsPermission;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
@ -85,7 +92,7 @@ import org.slf4j.LoggerFactory;
import static org.apache.hadoop.util.Time.now;
/**
* Provides the bridging logic between Hadoop's abstract filesystem and Azure Storage
* Provides the bridging logic between Hadoop's abstract filesystem and Azure Storage.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
@ -103,7 +110,8 @@ public class AzureBlobFileSystemStore {
private final AbfsConfiguration abfsConfiguration;
private final Set<String> azureAtomicRenameDirSet;
private boolean isNamespaceEnabledSet;
private boolean isNamespaceEnabled;
public AzureBlobFileSystemStore(URI uri, boolean isSecure, Configuration configuration, UserGroupInformation userGroupInformation)
throws AzureBlobFileSystemException {
@ -121,6 +129,20 @@ public class AzureBlobFileSystemStore {
initializeClient(uri, isSecure);
}
public boolean getIsNamespaceEnabled() throws AzureBlobFileSystemException {
if (!isNamespaceEnabledSet) {
LOG.debug("getFilesystemProperties for filesystem: {}",
client.getFileSystem());
final AbfsRestOperation op = client.getFilesystemProperties();
isNamespaceEnabled = Boolean.parseBoolean(
op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_NAMESPACE_ENABLED));
isNamespaceEnabledSet = true;
}
return isNamespaceEnabled;
}
@VisibleForTesting
URIBuilder getURIBuilder(final String hostName, boolean isSecure) {
String scheme = isSecure ? FileSystemUriSchemes.HTTPS_SCHEME : FileSystemUriSchemes.HTTP_SCHEME;
@ -197,7 +219,7 @@ public class AzureBlobFileSystemStore {
} catch (CharacterCodingException ex) {
throw new InvalidAbfsRestOperationException(ex);
}
client.setPathProperties("/" + getRelativePath(path), commaSeparatedProperties);
client.setPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), commaSeparatedProperties);
}
public void createFilesystem() throws AzureBlobFileSystemException {
@ -214,13 +236,20 @@ public class AzureBlobFileSystemStore {
client.deleteFilesystem();
}
public OutputStream createFile(final Path path, final boolean overwrite) throws AzureBlobFileSystemException {
LOG.debug("createFile filesystem: {} path: {} overwrite: {}",
public OutputStream createFile(final Path path, final boolean overwrite, final FsPermission permission,
final FsPermission umask) throws AzureBlobFileSystemException {
boolean isNamespaceEnabled = getIsNamespaceEnabled();
LOG.debug("createFile filesystem: {} path: {} overwrite: {} permission: {} umask: {} isNamespaceEnabled: {}",
client.getFileSystem(),
path,
overwrite);
overwrite,
permission.toString(),
umask.toString(),
isNamespaceEnabled);
client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), true, overwrite);
client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), true, overwrite,
isNamespaceEnabled ? getOctalNotation(permission) : null,
isNamespaceEnabled ? getOctalNotation(umask) : null);
final OutputStream outputStream;
outputStream = new FSDataOutputStream(
@ -229,16 +258,23 @@ public class AzureBlobFileSystemStore {
return outputStream;
}
public void createDirectory(final Path path) throws AzureBlobFileSystemException {
LOG.debug("createDirectory filesystem: {} path: {}",
public void createDirectory(final Path path, final FsPermission permission, final FsPermission umask)
throws AzureBlobFileSystemException {
boolean isNamespaceEnabled = getIsNamespaceEnabled();
LOG.debug("createDirectory filesystem: {} path: {} permission: {} umask: {} isNamespaceEnabled: {}",
client.getFileSystem(),
path);
path,
permission,
umask,
isNamespaceEnabled);
client.createPath("/" + getRelativePath(path), false, true);
client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), false, true,
isNamespaceEnabled ? getOctalNotation(permission) : null,
isNamespaceEnabled ? getOctalNotation(umask) : null);
}
public InputStream openFileForRead(final Path path, final FileSystem.Statistics statistics) throws AzureBlobFileSystemException {
public InputStream openFileForRead(final Path path, final FileSystem.Statistics statistics)
throws AzureBlobFileSystemException {
LOG.debug("openFileForRead filesystem: {} path: {}",
client.getFileSystem(),
path);
@ -327,7 +363,6 @@ public class AzureBlobFileSystemStore {
public void delete(final Path path, final boolean recursive)
throws AzureBlobFileSystemException {
LOG.debug("delete filesystem: {} path: {} recursive: {}",
client.getFileSystem(),
path,
@ -351,19 +386,31 @@ public class AzureBlobFileSystemStore {
}
public FileStatus getFileStatus(final Path path) throws IOException {
LOG.debug("getFileStatus filesystem: {} path: {}",
boolean isNamespaceEnabled = getIsNamespaceEnabled();
LOG.debug("getFileStatus filesystem: {} path: {} isNamespaceEnabled: {}",
client.getFileSystem(),
path);
path,
isNamespaceEnabled);
if (path.isRoot()) {
AbfsRestOperation op = client.getFilesystemProperties();
final AbfsRestOperation op = isNamespaceEnabled
? client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + AbfsHttpConstants.ROOT_PATH)
: client.getFilesystemProperties();
final long blockSize = abfsConfiguration.getAzureBlockSize();
final String owner = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER);
final String group = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP);
final String permissions = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS);
final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
final String lastModified = op.getResult().getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
final boolean hasAcl = AbfsPermission.isExtendedAcl(permissions);
return new VersionedFileStatus(
userGroupInformation.getUserName(),
userGroupInformation.getPrimaryGroupName(),
owner == null ? userGroupInformation.getUserName() : owner,
group == null ? userGroupInformation.getPrimaryGroupName() : group,
permissions == null ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
: AbfsPermission.valueOf(permissions),
hasAcl,
0,
true,
1,
@ -375,14 +422,22 @@ public class AzureBlobFileSystemStore {
AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
final long blockSize = abfsConfiguration.getAzureBlockSize();
final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
final String lastModified = op.getResult().getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
final String contentLength = op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH);
final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
final AbfsHttpOperation result = op.getResult();
final String eTag = result.getResponseHeader(HttpHeaderConfigurations.ETAG);
final String lastModified = result.getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
final String contentLength = result.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH);
final String resourceType = result.getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
final String owner = result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER);
final String group = result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP);
final String permissions = result.getResponseHeader((HttpHeaderConfigurations.X_MS_PERMISSIONS));
final boolean hasAcl = AbfsPermission.isExtendedAcl(permissions);
return new VersionedFileStatus(
userGroupInformation.getUserName(),
userGroupInformation.getPrimaryGroupName(),
owner == null ? userGroupInformation.getUserName() : owner,
group == null ? userGroupInformation.getPrimaryGroupName() : group,
permissions == null ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
: AbfsPermission.valueOf(permissions),
hasAcl,
parseContentLength(contentLength),
parseIsDirectory(resourceType),
1,
@ -417,6 +472,13 @@ public class AzureBlobFileSystemStore {
long blockSize = abfsConfiguration.getAzureBlockSize();
for (ListResultEntrySchema entry : retrievedSchema.paths()) {
final String owner = entry.owner() == null ? userGroupInformation.getUserName() : entry.owner();
final String group = entry.group() == null ? userGroupInformation.getPrimaryGroupName() : entry.group();
final FsPermission fsPermission = entry.permissions() == null
? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
: AbfsPermission.valueOf(entry.permissions());
final boolean hasAcl = AbfsPermission.isExtendedAcl(entry.permissions());
long lastModifiedMillis = 0;
long contentLength = entry.contentLength() == null ? 0 : entry.contentLength();
boolean isDirectory = entry.isDirectory() == null ? false : entry.isDirectory();
@ -429,8 +491,10 @@ public class AzureBlobFileSystemStore {
fileStatuses.add(
new VersionedFileStatus(
userGroupInformation.getUserName(),
userGroupInformation.getPrimaryGroupName(),
owner,
group,
fsPermission,
hasAcl,
contentLength,
isDirectory,
1,
@ -445,6 +509,211 @@ public class AzureBlobFileSystemStore {
return fileStatuses.toArray(new FileStatus[0]);
}
public void setOwner(final Path path, final String owner, final String group) throws
AzureBlobFileSystemException {
if (!getIsNamespaceEnabled()) {
throw new UnsupportedOperationException(
"This operation is only valid for storage accounts with the hierarchical namespace enabled.");
}
LOG.debug(
"setOwner filesystem: {} path: {} owner: {} group: {}",
client.getFileSystem(),
path.toString(),
owner,
group);
client.setOwner(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), owner, group);
}
public void setPermission(final Path path, final FsPermission permission) throws
AzureBlobFileSystemException {
if (!getIsNamespaceEnabled()) {
throw new UnsupportedOperationException(
"This operation is only valid for storage accounts with the hierarchical namespace enabled.");
}
LOG.debug(
"setPermission filesystem: {} path: {} permission: {}",
client.getFileSystem(),
path.toString(),
permission.toString());
client.setPermission(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true),
String.format(AbfsHttpConstants.PERMISSION_FORMAT, permission.toOctal()));
}
public void modifyAclEntries(final Path path, final List<AclEntry> aclSpec) throws
AzureBlobFileSystemException {
if (!getIsNamespaceEnabled()) {
throw new UnsupportedOperationException(
"This operation is only valid for storage accounts with the hierarchical namespace enabled.");
}
LOG.debug(
"modifyAclEntries filesystem: {} path: {} aclSpec: {}",
client.getFileSystem(),
path.toString(),
AclEntry.aclSpecToString(aclSpec));
final Map<String, String> modifyAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true));
final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
for (Map.Entry<String, String> modifyAclEntry : modifyAclEntries.entrySet()) {
aclEntries.put(modifyAclEntry.getKey(), modifyAclEntry.getValue());
}
if (!modifyAclEntries.containsKey(AbfsHttpConstants.ACCESS_MASK)) {
aclEntries.remove(AbfsHttpConstants.ACCESS_MASK);
}
if (!modifyAclEntries.containsKey(AbfsHttpConstants.DEFAULT_MASK)) {
aclEntries.remove(AbfsHttpConstants.DEFAULT_MASK);
}
client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true),
AbfsAclHelper.serializeAclSpec(aclEntries), eTag);
}
public void removeAclEntries(final Path path, final List<AclEntry> aclSpec) throws AzureBlobFileSystemException {
if (!getIsNamespaceEnabled()) {
throw new UnsupportedOperationException(
"This operation is only valid for storage accounts with the hierarchical namespace enabled.");
}
LOG.debug(
"removeAclEntries filesystem: {} path: {} aclSpec: {}",
client.getFileSystem(),
path.toString(),
AclEntry.aclSpecToString(aclSpec));
final Map<String, String> removeAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true));
final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
AbfsAclHelper.removeAclEntriesInternal(aclEntries, removeAclEntries);
client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true),
AbfsAclHelper.serializeAclSpec(aclEntries), eTag);
}
public void removeDefaultAcl(final Path path) throws AzureBlobFileSystemException {
if (!getIsNamespaceEnabled()) {
throw new UnsupportedOperationException(
"This operation is only valid for storage accounts with the hierarchical namespace enabled.");
}
LOG.debug(
"removeDefaultAcl filesystem: {} path: {}",
client.getFileSystem(),
path.toString());
final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true));
final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
final Map<String, String> defaultAclEntries = new HashMap<>();
for (Map.Entry<String, String> aclEntry : aclEntries.entrySet()) {
if (aclEntry.getKey().startsWith("default:")) {
defaultAclEntries.put(aclEntry.getKey(), aclEntry.getValue());
}
}
for (Map.Entry<String, String> defaultAclEntry : defaultAclEntries.entrySet()) {
aclEntries.remove(defaultAclEntry.getKey());
}
client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true),
AbfsAclHelper.serializeAclSpec(aclEntries), eTag);
}
public void removeAcl(final Path path) throws AzureBlobFileSystemException {
if (!getIsNamespaceEnabled()) {
throw new UnsupportedOperationException(
"This operation is only valid for storage accounts with the hierarchical namespace enabled.");
}
LOG.debug(
"removeAcl filesystem: {} path: {}",
client.getFileSystem(),
path.toString());
final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true));
final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
final Map<String, String> newAclEntries = new HashMap<>();
newAclEntries.put(AbfsHttpConstants.ACCESS_USER, aclEntries.get(AbfsHttpConstants.ACCESS_USER));
newAclEntries.put(AbfsHttpConstants.ACCESS_GROUP, aclEntries.get(AbfsHttpConstants.ACCESS_GROUP));
newAclEntries.put(AbfsHttpConstants.ACCESS_OTHER, aclEntries.get(AbfsHttpConstants.ACCESS_OTHER));
client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true),
AbfsAclHelper.serializeAclSpec(newAclEntries), eTag);
}
public void setAcl(final Path path, final List<AclEntry> aclSpec) throws AzureBlobFileSystemException {
if (!getIsNamespaceEnabled()) {
throw new UnsupportedOperationException(
"This operation is only valid for storage accounts with the hierarchical namespace enabled.");
}
LOG.debug(
"setAcl filesystem: {} path: {} aclspec: {}",
client.getFileSystem(),
path.toString(),
AclEntry.aclSpecToString(aclSpec));
final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true));
final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
final Map<String, String> getAclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
for (Map.Entry<String, String> ace : getAclEntries.entrySet()) {
if (ace.getKey().startsWith("default:") && (ace.getKey() != AbfsHttpConstants.DEFAULT_MASK)
&& !aclEntries.containsKey(ace.getKey())) {
aclEntries.put(ace.getKey(), ace.getValue());
}
}
client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true),
AbfsAclHelper.serializeAclSpec(aclEntries), eTag);
}
public AclStatus getAclStatus(final Path path) throws IOException {
if (!getIsNamespaceEnabled()) {
throw new UnsupportedOperationException(
"This operation is only valid for storage accounts with the hierarchical namespace enabled.");
}
LOG.debug(
"getAclStatus filesystem: {} path: {}",
client.getFileSystem(),
path.toString());
AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true));
AbfsHttpOperation result = op.getResult();
final String owner = result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER);
final String group = result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP);
final String permissions = result.getResponseHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS);
final String aclSpecString = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL);
final List<AclEntry> processedAclEntries = AclEntry.parseAclSpec(AbfsAclHelper.processAclString(aclSpecString), true);
final FsPermission fsPermission = permissions == null ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
: AbfsPermission.valueOf(permissions);
final AclStatus.Builder aclStatusBuilder = new AclStatus.Builder();
aclStatusBuilder.owner(owner == null ? userGroupInformation.getUserName() : owner);
aclStatusBuilder.group(group == null ? userGroupInformation.getPrimaryGroupName() : group);
aclStatusBuilder.setPermission(fsPermission);
aclStatusBuilder.stickyBit(fsPermission.getStickyBit());
aclStatusBuilder.addEntries(processedAclEntries);
return aclStatusBuilder.build();
}
public boolean isAtomicRenameKey(String key) {
return isKeyForDirectorySet(key, azureAtomicRenameDirSet);
}
@ -507,19 +776,24 @@ public class AzureBlobFileSystemStore {
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration, new ExponentialRetryPolicy(), tokenProvider);
}
private String getOctalNotation(FsPermission fsPermission) {
Preconditions.checkNotNull(fsPermission, "fsPermission");
return String.format(AbfsHttpConstants.PERMISSION_FORMAT, fsPermission.toOctal());
}
private String getRelativePath(final Path path) {
return getRelativePath(path, false);
}
private String getRelativePath(final Path path, final boolean allowRootPath) {
Preconditions.checkNotNull(path, "path");
final String relativePath = path.toUri().getPath();
if (relativePath.isEmpty()) {
return relativePath;
if (relativePath.length() == 0 || (relativePath.length() == 1 && relativePath.charAt(0) == Path.SEPARATOR_CHAR)) {
return allowRootPath ? AbfsHttpConstants.ROOT_PATH : AbfsHttpConstants.EMPTY_STRING;
}
if (relativePath.charAt(0) == Path.SEPARATOR_CHAR) {
if (relativePath.length() == 1) {
return AbfsHttpConstants.EMPTY_STRING;
}
return relativePath.substring(1);
}
@ -644,15 +918,17 @@ public class AzureBlobFileSystemStore {
private final String version;
VersionedFileStatus(
final String owner, final String group,
final String owner, final String group, final FsPermission fsPermission, final boolean hasAcl,
final long length, final boolean isdir, final int blockReplication,
final long blocksize, final long modificationTime, final Path path,
String version) {
super(length, isdir, blockReplication, blocksize, modificationTime, 0,
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL),
fsPermission,
owner,
group,
path);
null,
path,
hasAcl, false, false);
this.version = version;
}
@ -717,5 +993,4 @@ public class AzureBlobFileSystemStore {
AbfsClient getClient() {
return this.client;
}
}

View File

@ -34,6 +34,8 @@ public final class AbfsHttpConstants {
public static final String APPEND_ACTION = "append";
public static final String FLUSH_ACTION = "flush";
public static final String SET_PROPERTIES_ACTION = "setProperties";
public static final String SET_ACCESS_CONTROL = "setAccessControl";
public static final String GET_ACCESS_CONTROL = "getAccessControl";
public static final String DEFAULT_TIMEOUT = "90";
public static final String JAVA_VERSION = "java.version";
@ -58,6 +60,7 @@ public final class AbfsHttpConstants {
public static final String PLUS = "+";
public static final String STAR = "*";
public static final String COMMA = ",";
public static final String COLON = ":";
public static final String EQUAL = "=";
public static final String QUESTION_MARK = "?";
public static final String AND_MARK = "&";
@ -72,5 +75,17 @@ public final class AbfsHttpConstants {
public static final String APPLICATION_JSON = "application/json";
public static final String APPLICATION_OCTET_STREAM = "application/octet-stream";
public static final String ROOT_PATH = "/";
public static final String ACCESS_MASK = "mask:";
public static final String ACCESS_USER = "user:";
public static final String ACCESS_GROUP = "group:";
public static final String ACCESS_OTHER = "other:";
public static final String DEFAULT_MASK = "default:mask:";
public static final String DEFAULT_USER = "default:user:";
public static final String DEFAULT_GROUP = "default:group:";
public static final String DEFAULT_OTHER = "default:other:";
public static final String DEFAULT_SCOPE = "default:";
public static final String PERMISSION_FORMAT = "%04d";
private AbfsHttpConstants() {}
}

View File

@ -52,6 +52,12 @@ public final class HttpHeaderConfigurations {
public static final String X_MS_PROPERTIES = "x-ms-properties";
public static final String X_MS_RENAME_SOURCE = "x-ms-rename-source";
public static final String LAST_MODIFIED = "Last-Modified";
public static final String X_MS_OWNER = "x-ms-owner";
public static final String X_MS_GROUP = "x-ms-group";
public static final String X_MS_ACL = "x-ms-acl";
public static final String X_MS_PERMISSIONS = "x-ms-permissions";
public static final String X_MS_UMASK = "x-ms-umask";
public static final String X_MS_NAMESPACE_ENABLED = "x-ms-namespace-enabled";
private HttpHeaderConfigurations() {}
}

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.contracts.exceptions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Thrown when there is an attempt to perform an invalid operation on an ACL.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public final class InvalidAclOperationException extends AzureBlobFileSystemException {
public InvalidAclOperationException(String message) {
super(message);
}
}

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.classification.InterfaceStability;
public enum AzureServiceErrorCode {
FILE_SYSTEM_ALREADY_EXISTS("FilesystemAlreadyExists", HttpURLConnection.HTTP_CONFLICT, null),
PATH_ALREADY_EXISTS("PathAlreadyExists", HttpURLConnection.HTTP_CONFLICT, null),
INTERNAL_OPERATION_ABORT("InternalOperationAbortError", HttpURLConnection.HTTP_CONFLICT, null),
PATH_CONFLICT("PathConflict", HttpURLConnection.HTTP_CONFLICT, null),
FILE_SYSTEM_NOT_FOUND("FilesystemNotFound", HttpURLConnection.HTTP_NOT_FOUND, null),
PATH_NOT_FOUND("PathNotFound", HttpURLConnection.HTTP_NOT_FOUND, null),

View File

@ -0,0 +1,202 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAclOperationException;
import org.apache.hadoop.fs.permission.FsAction;
/**
* AbfsAclHelper provides convenience methods to implement modifyAclEntries / removeAclEntries / removeAcl / removeDefaultAcl
* from setAcl and getAcl.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public final class AbfsAclHelper {
private AbfsAclHelper() {
// not called
}
public static Map<String, String> deserializeAclSpec(final String aclSpecString) {
final Map<String, String> aclEntries = new HashMap<>();
final String[] aclArray = aclSpecString.split(AbfsHttpConstants.COMMA);
for (String acl : aclArray) {
int idx = acl.lastIndexOf(AbfsHttpConstants.COLON);
aclEntries.put(acl.substring(0, idx), acl.substring(idx + 1));
}
return aclEntries;
}
public static String serializeAclSpec(final Map<String, String> aclEntries) {
final StringBuilder sb = new StringBuilder();
for (Map.Entry<String, String> aclEntry : aclEntries.entrySet()) {
sb.append(aclEntry.getKey() + AbfsHttpConstants.COLON + aclEntry.getValue() + AbfsHttpConstants.COMMA);
}
if (sb.length() > 0) {
sb.setLength(sb.length() - 1);
}
return sb.toString();
}
public static String processAclString(final String aclSpecString) {
final List<String> aclEntries = Arrays.asList(aclSpecString.split(AbfsHttpConstants.COMMA));
final StringBuilder sb = new StringBuilder();
boolean containsMask = false;
for (int i = aclEntries.size() - 1; i >= 0; i--) {
String ace = aclEntries.get(i);
if (ace.startsWith(AbfsHttpConstants.ACCESS_OTHER)|| ace.startsWith(AbfsHttpConstants.ACCESS_USER + AbfsHttpConstants.COLON)) {
// skip
} else if (ace.startsWith(AbfsHttpConstants.ACCESS_MASK)) {
containsMask = true;
// skip
} else if (ace.startsWith(AbfsHttpConstants.ACCESS_GROUP + AbfsHttpConstants.COLON) && !containsMask) {
// skip
} else {
sb.insert(0, ace + AbfsHttpConstants.COMMA);
}
}
return sb.length() == 0 ? AbfsHttpConstants.EMPTY_STRING : sb.substring(0, sb.length() - 1);
}
public static void removeAclEntriesInternal(Map<String, String> aclEntries, Map<String, String> toRemoveEntries)
throws AzureBlobFileSystemException {
boolean accessAclTouched = false;
boolean defaultAclTouched = false;
final Set<String> removeIndicationSet = new HashSet<>();
for (String entryKey : toRemoveEntries.keySet()) {
final boolean isDefaultAcl = isDefaultAce(entryKey);
if (removeNamedAceAndUpdateSet(entryKey, isDefaultAcl, removeIndicationSet, aclEntries)) {
if (isDefaultAcl) {
defaultAclTouched = true;
} else {
accessAclTouched = true;
}
}
}
if (accessAclTouched) {
if (removeIndicationSet.contains(AbfsHttpConstants.ACCESS_MASK)) {
aclEntries.remove(AbfsHttpConstants.ACCESS_MASK);
}
recalculateMask(aclEntries, false);
}
if (defaultAclTouched) {
if (removeIndicationSet.contains(AbfsHttpConstants.DEFAULT_MASK)) {
aclEntries.remove(AbfsHttpConstants.DEFAULT_MASK);
}
if (removeIndicationSet.contains(AbfsHttpConstants.DEFAULT_USER)) {
aclEntries.put(AbfsHttpConstants.DEFAULT_USER, aclEntries.get(AbfsHttpConstants.ACCESS_USER));
}
if (removeIndicationSet.contains(AbfsHttpConstants.DEFAULT_GROUP)) {
aclEntries.put(AbfsHttpConstants.DEFAULT_GROUP, aclEntries.get(AbfsHttpConstants.ACCESS_GROUP));
}
if (removeIndicationSet.contains(AbfsHttpConstants.DEFAULT_OTHER)) {
aclEntries.put(AbfsHttpConstants.DEFAULT_OTHER, aclEntries.get(AbfsHttpConstants.ACCESS_OTHER));
}
recalculateMask(aclEntries, true);
}
}
private static boolean removeNamedAceAndUpdateSet(String entry, boolean isDefaultAcl, Set<String> removeIndicationSet,
Map<String, String> aclEntries)
throws AzureBlobFileSystemException {
final int startIndex = isDefaultAcl ? 1 : 0;
final String[] entryParts = entry.split(AbfsHttpConstants.COLON);
final String tag = isDefaultAcl ? AbfsHttpConstants.DEFAULT_SCOPE + entryParts[startIndex] + AbfsHttpConstants.COLON
: entryParts[startIndex] + AbfsHttpConstants.COLON;
if ((entry.equals(AbfsHttpConstants.ACCESS_USER) || entry.equals(AbfsHttpConstants.ACCESS_GROUP)
|| entry.equals(AbfsHttpConstants.ACCESS_OTHER))
&& !isNamedAce(entry)) {
throw new InvalidAclOperationException("Cannot remove user, group or other entry from access ACL.");
}
boolean touched = false;
if (!isNamedAce(entry)) {
removeIndicationSet.add(tag); // this must not be a access user, group or other
touched = true;
} else {
if (aclEntries.remove(entry) != null) {
touched = true;
}
}
return touched;
}
private static void recalculateMask(Map<String, String> aclEntries, boolean isDefaultMask) {
FsAction umask = FsAction.NONE;
if (!isExtendAcl(aclEntries, isDefaultMask)) {
return;
}
for (Map.Entry<String, String> aclEntry : aclEntries.entrySet()) {
if (isDefaultMask) {
if ((isDefaultAce(aclEntry.getKey()) && isNamedAce(aclEntry.getKey()))
|| aclEntry.getKey().equals(AbfsHttpConstants.DEFAULT_GROUP)) {
umask = umask.or(FsAction.getFsAction(aclEntry.getValue()));
}
} else {
if ((!isDefaultAce(aclEntry.getKey()) && isNamedAce(aclEntry.getKey()))
|| aclEntry.getKey().equals(AbfsHttpConstants.ACCESS_GROUP)) {
umask = umask.or(FsAction.getFsAction(aclEntry.getValue()));
}
}
}
aclEntries.put(isDefaultMask ? AbfsHttpConstants.DEFAULT_MASK : AbfsHttpConstants.ACCESS_MASK, umask.SYMBOL);
}
private static boolean isExtendAcl(Map<String, String> aclEntries, boolean checkDefault) {
for (String entryKey : aclEntries.keySet()) {
if (checkDefault && !(entryKey.equals(AbfsHttpConstants.DEFAULT_USER)
|| entryKey.equals(AbfsHttpConstants.DEFAULT_GROUP)
|| entryKey.equals(AbfsHttpConstants.DEFAULT_OTHER) || !isDefaultAce(entryKey))) {
return true;
}
if (!checkDefault && !(entryKey.equals(AbfsHttpConstants.ACCESS_USER)
|| entryKey.equals(AbfsHttpConstants.ACCESS_GROUP)
|| entryKey.equals(AbfsHttpConstants.ACCESS_OTHER) || isDefaultAce(entryKey))) {
return true;
}
}
return false;
}
private static boolean isDefaultAce(String entry) {
return entry.startsWith(AbfsHttpConstants.DEFAULT_SCOPE);
}
private static boolean isNamedAce(String entry) {
return entry.charAt(entry.length() - 1) != AbfsHttpConstants.COLON.charAt(0);
}
}

View File

@ -29,6 +29,9 @@ import java.util.Locale;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.utils.SSLSocketFactoryEx;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -159,7 +162,8 @@ public class AbfsClient {
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);
abfsUriQueryBuilder.addQuery(QUERY_PARAM_DIRECTORY, relativePath == null ? "" : relativePath);
abfsUriQueryBuilder.addQuery(QUERY_PARAM_DIRECTORY, relativePath == null ? AbfsHttpConstants.EMPTY_STRING
: relativePath);
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RECURSIVE, String.valueOf(recursive));
abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation);
abfsUriQueryBuilder.addQuery(QUERY_PARAM_MAXRESULTS, String.valueOf(listMaxResults));
@ -206,11 +210,19 @@ public class AbfsClient {
return op;
}
public AbfsRestOperation createPath(final String path, final boolean isFile, final boolean overwrite)
throws AzureBlobFileSystemException {
public AbfsRestOperation createPath(final String path, final boolean isFile, final boolean overwrite,
final String permission, final String umask) throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
if (!overwrite) {
requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, "*"));
requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, AbfsHttpConstants.STAR));
}
if (permission != null && !permission.isEmpty()) {
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS, permission));
}
if (umask != null && !umask.isEmpty()) {
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_UMASK, umask));
}
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
@ -269,7 +281,6 @@ public class AbfsClient {
return op;
}
public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
@ -373,6 +384,104 @@ public class AbfsClient {
return op;
}
public AbfsRestOperation setOwner(final String path, final String owner, final String group)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
// JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header.
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
HTTP_METHOD_PATCH));
if (owner != null && !owner.isEmpty()) {
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_OWNER, owner));
}
if (group != null && !group.isEmpty()) {
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_GROUP, group));
}
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.SET_ACCESS_CONTROL);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
this,
AbfsHttpConstants.HTTP_METHOD_PUT,
url,
requestHeaders);
op.execute();
return op;
}
public AbfsRestOperation setPermission(final String path, final String permission)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
// JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header.
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
HTTP_METHOD_PATCH));
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS, permission));
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.SET_ACCESS_CONTROL);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
this,
AbfsHttpConstants.HTTP_METHOD_PUT,
url,
requestHeaders);
op.execute();
return op;
}
public AbfsRestOperation setAcl(final String path, final String aclSpecString) throws AzureBlobFileSystemException {
return setAcl(path, aclSpecString, AbfsHttpConstants.EMPTY_STRING);
}
public AbfsRestOperation setAcl(final String path, final String aclSpecString, final String eTag)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
// JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header.
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
HTTP_METHOD_PATCH));
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_ACL, aclSpecString));
if (eTag != null && !eTag.isEmpty()) {
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, eTag));
}
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.SET_ACCESS_CONTROL);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
this,
AbfsHttpConstants.HTTP_METHOD_PUT,
url,
requestHeaders);
op.execute();
return op;
}
public AbfsRestOperation getAclStatus(final String path) throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.GET_ACCESS_CONTROL);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
this,
AbfsHttpConstants.HTTP_METHOD_HEAD,
url,
requestHeaders);
op.execute();
return op;
}
private URL createRequestUrl(final String query) throws AzureBlobFileSystemException {
return createRequestUrl(EMPTY_STRING, query);
}

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.fs.azurebfs.services;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
/**
* The AbfsPermission for AbfsClient.
*/
public class AbfsPermission extends FsPermission {
private static final int STICKY_BIT_OCTAL_VALUE = 01000;
private final boolean aclBit;
public AbfsPermission(Short aShort, boolean aclBitStatus) {
super(aShort);
this.aclBit = aclBitStatus;
}
public AbfsPermission(FsAction u, FsAction g, FsAction o) {
super(u, g, o, false);
this.aclBit = false;
}
/**
* Returns true if there is also an ACL (access control list).
*
* @return boolean true if there is also an ACL (access control list).
* @deprecated Get acl bit from the {@link org.apache.hadoop.fs.FileStatus}
* object.
*/
public boolean getAclBit() {
return aclBit;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof FsPermission) {
FsPermission that = (FsPermission) obj;
return this.getUserAction() == that.getUserAction()
&& this.getGroupAction() == that.getGroupAction()
&& this.getOtherAction() == that.getOtherAction()
&& this.getStickyBit() == that.getStickyBit();
}
return false;
}
/**
* Create a AbfsPermission from a abfs symbolic permission string
* @param abfsSymbolicPermission e.g. "rw-rw-rw-+" / "rw-rw-rw-"
* @return a permission object for the provided string representation
*/
public static AbfsPermission valueOf(final String abfsSymbolicPermission) {
if (abfsSymbolicPermission == null) {
return null;
}
final boolean isExtendedAcl = abfsSymbolicPermission.charAt(abfsSymbolicPermission.length() - 1) == '+';
final String abfsRawSymbolicPermission = isExtendedAcl ? abfsSymbolicPermission.substring(0, abfsSymbolicPermission.length() - 1)
: abfsSymbolicPermission;
int n = 0;
for (int i = 0; i < abfsRawSymbolicPermission.length(); i++) {
n = n << 1;
char c = abfsRawSymbolicPermission.charAt(i);
n += (c == '-' || c == 'T' || c == 'S') ? 0: 1;
}
// Add sticky bit value if set
if (abfsRawSymbolicPermission.charAt(abfsRawSymbolicPermission.length() - 1) == 't'
|| abfsRawSymbolicPermission.charAt(abfsRawSymbolicPermission.length() - 1) == 'T') {
n += STICKY_BIT_OCTAL_VALUE;
}
return new AbfsPermission((short) n, isExtendedAcl);
}
/**
* Check whether abfs symbolic permission string is a extended Acl
* @param abfsSymbolicPermission e.g. "rw-rw-rw-+" / "rw-rw-rw-"
* @return true if the permission string indicates the existence of an
* extended ACL; otherwise false.
*/
public static boolean isExtendedAcl(final String abfsSymbolicPermission) {
if (abfsSymbolicPermission == null) {
return false;
}
return abfsSymbolicPermission.charAt(abfsSymbolicPermission.length() - 1) == '+';
}
@Override
public int hashCode() {
return toShort();
}
}

View File

@ -43,6 +43,8 @@ public class ITestAzureBlobFileSystemBackCompat extends
@Test
public void testBlobBackCompat() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
// test only valid for non-namespace enabled account
Assume.assumeFalse(fs.getIsNamespaceEnabeld());
String storageConnectionString = getBlobConnectionString();
CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString);
CloudBlobClient blobClient = storageAccount.createCloudBlobClient();

View File

@ -20,11 +20,12 @@ package org.apache.hadoop.fs.azurebfs;
import java.io.IOException;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.junit.Test;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
/**
@ -32,9 +33,17 @@ import org.apache.hadoop.fs.permission.FsPermission;
*/
public class ITestAzureBlobFileSystemFileStatus extends
AbstractAbfsIntegrationTest {
private static final String DEFAULT_FILE_PERMISSION_VALUE = "640";
private static final String DEFAULT_DIR_PERMISSION_VALUE = "750";
private static final String DEFAULT_UMASK_VALUE = "027";
private static final Path TEST_FILE = new Path("testFile");
private static final Path TEST_FOLDER = new Path("testDir");
public ITestAzureBlobFileSystemFileStatus() {
super();
}
@Test
public void testEnsureStatusWorksForRoot() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
@ -48,20 +57,32 @@ public class ITestAzureBlobFileSystemFileStatus extends
public void testFileStatusPermissionsAndOwnerAndGroup() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
touch(TEST_FILE);
validateStatus(fs, TEST_FILE);
validateStatus(fs, TEST_FILE, false);
}
private FileStatus validateStatus(final AzureBlobFileSystem fs, final Path name)
private FileStatus validateStatus(final AzureBlobFileSystem fs, final Path name, final boolean isDir)
throws IOException {
FileStatus fileStatus = fs.getFileStatus(name);
fs.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, DEFAULT_UMASK_VALUE);
String errorInStatus = "error in " + fileStatus + " from " + fs;
assertEquals(errorInStatus + ": permission",
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL),
fileStatus.getPermission());
// When running with Oauth, the owner and group info retrieved from server will be digit ids.
if (this.getAuthType() != AuthType.OAuth && !fs.isSecure()) {
assertEquals(errorInStatus + ": owner",
fs.getOwnerUser(), fileStatus.getOwner());
assertEquals(errorInStatus + ": group",
fs.getOwnerUserPrimaryGroup(), fileStatus.getGroup());
} else {
if (isDir) {
assertEquals(errorInStatus + ": permission",
new FsPermission(DEFAULT_DIR_PERMISSION_VALUE), fileStatus.getPermission());
} else {
assertEquals(errorInStatus + ": permission",
new FsPermission(DEFAULT_FILE_PERMISSION_VALUE), fileStatus.getPermission());
}
}
return fileStatus;
}
@ -70,7 +91,7 @@ public class ITestAzureBlobFileSystemFileStatus extends
final AzureBlobFileSystem fs = this.getFileSystem();
fs.mkdirs(TEST_FOLDER);
validateStatus(fs, TEST_FOLDER);
validateStatus(fs, TEST_FOLDER, true);
}
}

View File

@ -218,6 +218,9 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
String wasbUrl = testAccount.getFileSystem().getName();
String abfsUrl = wasbUrlToAbfsUrl(wasbUrl);
final AzureBlobFileSystem fs = this.getFileSystem(abfsUrl);
// test only valid for non-namespace enabled account
Assume.assumeFalse(fs.getIsNamespaceEnabeld());
byte[] buffer = getRandomBytesArray();
CloudBlockBlob blob = testAccount.getBlobReference(TEST_FILE_PATH.toString().substring(1));
try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, true)) {
@ -238,6 +241,9 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
String wasbUrl = testAccount.getFileSystem().getName();
String abfsUrl = wasbUrlToAbfsUrl(wasbUrl);
final AzureBlobFileSystem fs = this.getFileSystem(abfsUrl);
// test only valid for non-namespace enabled account
Assume.assumeFalse(fs.getIsNamespaceEnabeld());
byte[] buffer = getRandomBytesArray();
CloudBlockBlob blob = testAccount.getBlobReference(TEST_FILE_PATH.toString().substring(1));
try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, false)) {

View File

@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs;
import java.util.ArrayList;
import java.util.Collection;
import java.util.UUID;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.azurebfs.utils.Parallelized;
/**
* Test permission operations.
*/
@RunWith(Parallelized.class)
public class ITestAzureBlobFileSystemPermission extends AbstractAbfsIntegrationTest{
private static Path testRoot = new Path("/test");
private static final String DEFAULT_UMASK_VALUE = "027";
private static final FsPermission DEFAULT_UMASK_PERMISSION = new FsPermission(DEFAULT_UMASK_VALUE);
private static final int KILOBYTE = 1024;
private FsPermission permission;
private Path path;
public ITestAzureBlobFileSystemPermission(FsPermission testPermission) throws Exception {
super();
permission = testPermission;
Assume.assumeTrue(this.getAuthType() == AuthType.OAuth);
}
@Parameterized.Parameters(name = "{0}")
public static Collection abfsCreateNonRecursiveTestData()
throws Exception {
/*
Test Data
File/Folder name, User permission, Group permission, Other Permission,
Parent already exist
shouldCreateSucceed, expectedExceptionIfFileCreateFails
*/
final Collection<Object[]> datas = new ArrayList<>();
for (FsAction g : FsAction.values()) {
for (FsAction o : FsAction.values()) {
datas.add(new Object[] {new FsPermission(FsAction.ALL, g, o)});
}
}
return datas;
}
@Test
public void testFilePermission() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
fs.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, DEFAULT_UMASK_VALUE);
path = new Path(testRoot, UUID.randomUUID().toString());
fs.mkdirs(path.getParent(),
new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
fs.removeDefaultAcl(path.getParent());
fs.create(path, permission, true, KILOBYTE, (short) 1, KILOBYTE - 1, null);
FileStatus status = fs.getFileStatus(path);
Assert.assertEquals(permission.applyUMask(DEFAULT_UMASK_PERMISSION), status.getPermission());
}
@Test
public void testFolderPermission() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
fs.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "027");
path = new Path(testRoot, UUID.randomUUID().toString());
fs.mkdirs(path.getParent(),
new FsPermission(FsAction.ALL, FsAction.WRITE, FsAction.NONE));
fs.removeDefaultAcl(path.getParent());
fs.mkdirs(path, permission);
FileStatus status = fs.getFileStatus(path);
Assert.assertEquals(permission.applyUMask(DEFAULT_UMASK_PERMISSION), status.getPermission());
}
}

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.fs.azurebfs;
import java.io.EOFException;
import java.io.IOException;
import java.util.Random;
@ -524,6 +523,9 @@ public class ITestAzureBlobFileSystemRandomRead extends
}
private void createTestFile() throws Exception {
final AzureBlobFileSystem abFs = this.getFileSystem();
// test only valid for non-namespace enabled account
Assume.assumeFalse(abFs.getIsNamespaceEnabeld());
FileSystem fs = this.getWasbFileSystem();
if (fs.exists(TEST_FILE_PATH)) {

View File

@ -25,6 +25,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.junit.Assert;
import org.junit.Test;
import org.apache.hadoop.fs.FileStatus;
@ -133,4 +134,17 @@ public class ITestAzureBlobFileSystemRename extends
new Path(fs.getUri().toString() + "/s"),
false);
}
@Test
public void testPosixRenameDirectory() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
fs.mkdirs(new Path("testDir2/test1/test2/test3"));
fs.mkdirs(new Path("testDir2/test4"));
Assert.assertTrue(fs.rename(new Path("testDir2/test1/test2/test3"), new Path("testDir2/test4")));
assertTrue(fs.exists(new Path("testDir2")));
assertTrue(fs.exists(new Path("testDir2/test1/test2")));
assertTrue(fs.exists(new Path("testDir2/test4")));
assertTrue(fs.exists(new Path("testDir2/test4/test3")));
assertFalse(fs.exists(new Path("testDir2/test1/test2/test3")));
}
}

View File

@ -58,6 +58,9 @@ public class ITestWasbAbfsCompatibility extends AbstractAbfsIntegrationTest {
public void testListFileStatus() throws Exception {
// crate file using abfs
AzureBlobFileSystem fs = getFileSystem();
// test only valid for non-namespace enabled account
Assume.assumeFalse(fs.getIsNamespaceEnabeld());
NativeAzureFileSystem wasb = getWasbFileSystem();
Path path1 = new Path("/testfiles/~12/!008/3/abFsTestfile");
@ -89,6 +92,9 @@ public class ITestWasbAbfsCompatibility extends AbstractAbfsIntegrationTest {
boolean[] readFileWithAbfs = new boolean[]{false, true, true, false};
AzureBlobFileSystem abfs = getFileSystem();
// test only valid for non-namespace enabled account
Assume.assumeFalse(abfs.getIsNamespaceEnabeld());
NativeAzureFileSystem wasb = getWasbFileSystem();
for (int i = 0; i< 4; i++) {
@ -125,6 +131,9 @@ public class ITestWasbAbfsCompatibility extends AbstractAbfsIntegrationTest {
boolean[] readDirWithAbfs = new boolean[]{false, true, true, false};
AzureBlobFileSystem abfs = getFileSystem();
// test only valid for non-namespace enabled account
Assume.assumeFalse(abfs.getIsNamespaceEnabeld());
NativeAzureFileSystem wasb = getWasbFileSystem();
for (int i = 0; i < 4; i++) {
@ -156,6 +165,9 @@ public class ITestWasbAbfsCompatibility extends AbstractAbfsIntegrationTest {
public void testSetWorkingDirectory() throws Exception {
//create folders
AzureBlobFileSystem abfs = getFileSystem();
// test only valid for non-namespace enabled account
Assume.assumeFalse(abfs.getIsNamespaceEnabeld());
NativeAzureFileSystem wasb = getWasbFileSystem();
Path d1d4 = new Path("/d1/d2/d3/d4");

View File

@ -0,0 +1,119 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.utils;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclEntryScope;
import org.apache.hadoop.fs.permission.AclEntryType;
import org.apache.hadoop.fs.permission.FsAction;
import static org.junit.Assert.assertEquals;
/**
* Helper methods useful for writing ACL tests.
*/
public final class AclTestHelpers {
/**
* Create a new AclEntry with scope, type and permission (no name).
*
* @param scope AclEntryScope scope of the ACL entry
* @param type AclEntryType ACL entry type
* @param permission FsAction set of permissions in the ACL entry
* @return AclEntry new AclEntry
*/
public static AclEntry aclEntry(AclEntryScope scope, AclEntryType type,
FsAction permission) {
return new AclEntry.Builder()
.setScope(scope)
.setType(type)
.setPermission(permission)
.build();
}
/**
* Create a new AclEntry with scope, type, name and permission.
*
* @param scope AclEntryScope scope of the ACL entry
* @param type AclEntryType ACL entry type
* @param name String optional ACL entry name
* @param permission FsAction set of permissions in the ACL entry
* @return AclEntry new AclEntry
*/
public static AclEntry aclEntry(AclEntryScope scope, AclEntryType type,
String name, FsAction permission) {
return new AclEntry.Builder()
.setScope(scope)
.setType(type)
.setName(name)
.setPermission(permission)
.build();
}
/**
* Create a new AclEntry with scope, type and name (no permission).
*
* @param scope AclEntryScope scope of the ACL entry
* @param type AclEntryType ACL entry type
* @param name String optional ACL entry name
* @return AclEntry new AclEntry
*/
public static AclEntry aclEntry(AclEntryScope scope, AclEntryType type,
String name) {
return new AclEntry.Builder()
.setScope(scope)
.setType(type)
.setName(name)
.build();
}
/**
* Create a new AclEntry with scope and type (no name or permission).
*
* @param scope AclEntryScope scope of the ACL entry
* @param type AclEntryType ACL entry type
* @return AclEntry new AclEntry
*/
public static AclEntry aclEntry(AclEntryScope scope, AclEntryType type) {
return new AclEntry.Builder()
.setScope(scope)
.setType(type)
.build();
}
/**
* Asserts the value of the FsPermission bits on the inode of a specific path.
*
* @param fs FileSystem to use for check
* @param pathToCheck Path inode to check
* @param perm short expected permission bits
* @throws IOException thrown if there is an I/O error
*/
public static void assertPermission(FileSystem fs, Path pathToCheck,
short perm) throws IOException {
assertEquals(perm, fs.getFileStatus(pathToCheck).getPermission().toShort());
}
private AclTestHelpers() {
// Not called.
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.fs.azurebfs.utils;
import org.junit.runners.Parameterized;
import org.junit.runners.model.RunnerScheduler;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* Provided for convenience to execute parametrized test cases concurrently.
*/
public class Parallelized extends Parameterized {
public Parallelized(Class classObj) throws Throwable {
super(classObj);
setScheduler(new ThreadPoolScheduler());
}
private static class ThreadPoolScheduler implements RunnerScheduler {
private ExecutorService executor;
ThreadPoolScheduler() {
int numThreads = 10;
executor = Executors.newFixedThreadPool(numThreads);
}
public void finished() {
executor.shutdown();
try {
executor.awaitTermination(10, TimeUnit.MINUTES);
} catch (InterruptedException exc) {
throw new RuntimeException(exc);
}
}
public void schedule(Runnable childStatement) {
executor.submit(childStatement);
}
}
}