diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index f36cc7d5bfd..0a8224aaaeb 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.constants.AuthConfigurations; import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerConfigurationValidatorAnnotation; +import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerWithOutlierConfigurationValidatorAnnotation; import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.LongConfigurationValidatorAnnotation; import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.StringConfigurationValidatorAnnotation; import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.Base64StringConfigurationValidatorAnnotation; @@ -208,6 +209,15 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES) private String azureAppendBlobDirs; + @StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_INFINITE_LEASE_KEY, + DefaultValue = DEFAULT_FS_AZURE_INFINITE_LEASE_DIRECTORIES) + private String azureInfiniteLeaseDirs; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_LEASE_THREADS, + MinValue = MIN_LEASE_THREADS, + DefaultValue = DEFAULT_LEASE_THREADS) + private int numLeaseThreads; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, DefaultValue = DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION) private boolean createRemoteFileSystemDuringInitialization; @@ -296,6 +306,8 @@ public class AbfsConfiguration{ field.setAccessible(true); if (field.isAnnotationPresent(IntegerConfigurationValidatorAnnotation.class)) { field.set(this, validateInt(field)); + } else if (field.isAnnotationPresent(IntegerWithOutlierConfigurationValidatorAnnotation.class)) { + field.set(this, validateIntWithOutlier(field)); } else if (field.isAnnotationPresent(LongConfigurationValidatorAnnotation.class)) { field.set(this, validateLong(field)); } else if (field.isAnnotationPresent(StringConfigurationValidatorAnnotation.class)) { @@ -634,6 +646,14 @@ public class AbfsConfiguration{ return this.azureAppendBlobDirs; } + public String getAzureInfiniteLeaseDirs() { + return this.azureInfiniteLeaseDirs; + } + + public int getNumLeaseThreads() { + return this.numLeaseThreads; + } + public boolean getCreateRemoteFileSystemDuringInitialization() { // we do not support creating the filesystem when AuthType is SAS return this.createRemoteFileSystemDuringInitialization @@ -843,6 +863,21 @@ public class AbfsConfiguration{ validator.ThrowIfInvalid()).validate(value); } + int validateIntWithOutlier(Field field) throws IllegalAccessException, InvalidConfigurationValueException { + IntegerWithOutlierConfigurationValidatorAnnotation validator = + field.getAnnotation(IntegerWithOutlierConfigurationValidatorAnnotation.class); + String value = get(validator.ConfigurationKey()); + + // validate + return new IntegerConfigurationBasicValidator( + validator.OutlierValue(), + validator.MinValue(), + validator.MaxValue(), + validator.DefaultValue(), + validator.ConfigurationKey(), + validator.ThrowIfInvalid()).validate(value); + } + long validateLong(Field field) throws IllegalAccessException, InvalidConfigurationValueException { LongConfigurationValidatorAnnotation validator = field.getAnnotation(LongConfigurationValidatorAnnotation.class); String value = rawConfig.get(validator.ConfigurationKey()); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index d8a2ed7bcd5..30108ed1e2f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -87,6 +87,7 @@ import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.functional.RemoteIterators; +import org.apache.hadoop.util.DurationInfo; import org.apache.hadoop.util.LambdaUtils; import org.apache.hadoop.util.Progressable; @@ -505,6 +506,26 @@ public class AzureBlobFileSystem extends FileSystem { } } + /** + * Break the current lease on an ABFS file if it exists. A lease that is broken cannot be + * renewed. A new lease may be obtained on the file immediately. + * + * @param f file name + * @throws IOException on any exception while breaking the lease + */ + public void breakLease(final Path f) throws IOException { + LOG.debug("AzureBlobFileSystem.breakLease path: {}", f); + + Path qualifiedPath = makeQualified(f); + + try (DurationInfo ignored = new DurationInfo(LOG, false, "Break lease for %s", + qualifiedPath)) { + abfsStore.breakLease(qualifiedPath); + } catch(AzureBlobFileSystemException ex) { + checkException(f, ex); + } + } + /** * Qualify a path to one which uses this FileSystem and, if relative, * made absolute. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 75419c26dd2..fa7e12bc80e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -39,6 +39,7 @@ import java.text.SimpleDateFormat; import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -48,10 +49,14 @@ import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.WeakHashMap; +import java.util.concurrent.ExecutionException; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.base.Strings; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,6 +105,7 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsPermission; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.fs.azurebfs.services.AuthType; import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy; +import org.apache.hadoop.fs.azurebfs.services.AbfsLease; import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials; import org.apache.hadoop.fs.azurebfs.services.AbfsPerfTracker; import org.apache.hadoop.fs.azurebfs.services.AbfsPerfInfo; @@ -145,8 +151,11 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { private static final String XMS_PROPERTIES_ENCODING = "ISO-8859-1"; private static final int GET_SET_AGGREGATE_COUNT = 2; + private final Map leaseRefs; + private final AbfsConfiguration abfsConfiguration; private final Set azureAtomicRenameDirSet; + private Set azureInfiniteLeaseDirSet; private Trilean isNamespaceEnabled; private final AuthType authType; private final UserGroupInformation userGroupInformation; @@ -167,6 +176,8 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { final String fileSystemName = authorityParts[0]; final String accountName = authorityParts[1]; + leaseRefs = Collections.synchronizedMap(new WeakHashMap<>()); + try { this.abfsConfiguration = new AbfsConfiguration(configuration, accountName); } catch (IllegalAccessException exception) { @@ -195,6 +206,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList( abfsConfiguration.getAzureAtomicRenameDirs().split(AbfsHttpConstants.COMMA))); + updateInfiniteLeaseDirs(); this.authType = abfsConfiguration.getAuthType(accountName); boolean usingOauth = (authType == AuthType.OAuth); boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : isSecureScheme; @@ -246,7 +258,24 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { @Override public void close() throws IOException { - IOUtils.cleanupWithLogger(LOG, client); + List> futures = new ArrayList<>(); + for (AbfsLease lease : leaseRefs.keySet()) { + if (lease == null) { + continue; + } + ListenableFuture future = client.submit(() -> lease.free()); + futures.add(future); + } + try { + Futures.allAsList(futures).get(); + } catch (InterruptedException e) { + LOG.error("Interrupted freeing leases", e); + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + LOG.error("Error freeing leases", e); + } finally { + IOUtils.cleanupWithLogger(LOG, client); + } } byte[] encodeAttribute(String value) throws UnsupportedEncodingException { @@ -496,12 +525,14 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { } perfInfo.registerResult(op.getResult()).registerSuccess(true); + AbfsLease lease = maybeCreateLease(relativePath); + return new AbfsOutputStream( client, statistics, relativePath, 0, - populateAbfsOutputStreamContext(isAppendBlob)); + populateAbfsOutputStreamContext(isAppendBlob, lease)); } } @@ -573,7 +604,8 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { return op; } - private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppendBlob) { + private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppendBlob, + AbfsLease lease) { int bufferSize = abfsConfiguration.getWriteBufferSize(); if (isAppendBlob && bufferSize > FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE) { bufferSize = FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE; @@ -587,6 +619,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { .withAppendBlob(isAppendBlob) .withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount()) .withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue()) + .withLease(lease) .build(); } @@ -705,15 +738,29 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { isAppendBlob = true; } + AbfsLease lease = maybeCreateLease(relativePath); + return new AbfsOutputStream( client, statistics, relativePath, offset, - populateAbfsOutputStreamContext(isAppendBlob)); + populateAbfsOutputStreamContext(isAppendBlob, lease)); } } + /** + * Break any current lease on an ABFS file. + * + * @param path file name + * @throws AzureBlobFileSystemException on any exception while breaking the lease + */ + public void breakLease(final Path path) throws AzureBlobFileSystemException { + LOG.debug("lease path: {}", path); + + client.breakLease(getRelativePath(path)); + } + public void rename(final Path source, final Path destination) throws AzureBlobFileSystemException { final Instant startAggregate = abfsPerfTracker.getLatencyInstant(); @@ -1347,6 +1394,13 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { return isKeyForDirectorySet(key, azureAtomicRenameDirSet); } + public boolean isInfiniteLeaseKey(String key) { + if (azureInfiniteLeaseDirSet.isEmpty()) { + return false; + } + return isKeyForDirectorySet(key, azureInfiniteLeaseDirSet); + } + /** * A on-off operation to initialize AbfsClient for AzureBlobFileSystem * Operations. @@ -1636,4 +1690,32 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { this.isNamespaceEnabled = isNamespaceEnabled; } + private void updateInfiniteLeaseDirs() { + this.azureInfiniteLeaseDirSet = new HashSet<>(Arrays.asList( + abfsConfiguration.getAzureInfiniteLeaseDirs().split(AbfsHttpConstants.COMMA))); + // remove the empty string, since isKeyForDirectory returns true for empty strings + // and we don't want to default to enabling infinite lease dirs + this.azureInfiniteLeaseDirSet.remove(""); + } + + private AbfsLease maybeCreateLease(String relativePath) + throws AzureBlobFileSystemException { + boolean enableInfiniteLease = isInfiniteLeaseKey(relativePath); + if (!enableInfiniteLease) { + return null; + } + AbfsLease lease = new AbfsLease(client, relativePath); + leaseRefs.put(lease, null); + return lease; + } + + @VisibleForTesting + boolean areLeasesFreed() { + for (AbfsLease lease : leaseRefs.keySet()) { + if (lease != null && !lease.isFreed()) { + return false; + } + } + return true; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java index 184657e7d66..5cf7ec565b5 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java @@ -39,6 +39,11 @@ public final class AbfsHttpConstants { public static final String GET_ACCESS_CONTROL = "getAccessControl"; public static final String CHECK_ACCESS = "checkAccess"; public static final String GET_STATUS = "getStatus"; + public static final String ACQUIRE_LEASE_ACTION = "acquire"; + public static final String BREAK_LEASE_ACTION = "break"; + public static final String RELEASE_LEASE_ACTION = "release"; + public static final String RENEW_LEASE_ACTION = "renew"; + public static final String DEFAULT_LEASE_BREAK_PERIOD = "0"; public static final String DEFAULT_TIMEOUT = "90"; public static final String APPEND_BLOB_TYPE = "appendblob"; public static final String TOKEN_VERSION = "2"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 02b143cd61b..4fe1d1c276d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -87,6 +87,15 @@ public final class ConfigurationKeys { /** Provides a config to provide comma separated path prefixes on which Appendblob based files are created * Default is empty. **/ public static final String FS_AZURE_APPEND_BLOB_KEY = "fs.azure.appendblob.directories"; + /** Provides a config to provide comma separated path prefixes which support infinite leases. + * Files under these paths will be leased when created or opened for writing and the lease will + * be released when the file is closed. The lease may be broken with the breakLease method on + * AzureBlobFileSystem. Default is empty. + * **/ + public static final String FS_AZURE_INFINITE_LEASE_KEY = "fs.azure.infinite-lease.directories"; + /** Provides a number of threads to use for lease operations for infinite lease directories. + * Must be set to a minimum of 1 if infinite lease directories are to be used. Default is 0. **/ + public static final String FS_AZURE_LEASE_THREADS = "fs.azure.lease.threads"; public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth"; public static final String FS_AZURE_ALWAYS_READ_BUFFER_SIZE = "fs.azure.read.alwaysReadBufferSize"; public static final String FS_AZURE_READ_AHEAD_BLOCK_SIZE = "fs.azure.read.readahead.blocksize"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index d90f525712a..040b18ae4c2 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -78,6 +78,13 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE = true; public static final boolean DEFAULT_FS_AZURE_ENABLE_MKDIR_OVERWRITE = true; public static final String DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES = ""; + public static final String DEFAULT_FS_AZURE_INFINITE_LEASE_DIRECTORIES = ""; + public static final int DEFAULT_LEASE_THREADS = 0; + public static final int MIN_LEASE_THREADS = 0; + public static final int DEFAULT_LEASE_DURATION = -1; + public static final int INFINITE_LEASE_DURATION = -1; + public static final int MIN_LEASE_DURATION = 15; + public static final int MAX_LEASE_DURATION = 60; public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java index 27ddcee695a..232553844fc 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java @@ -60,6 +60,11 @@ public final class HttpHeaderConfigurations { public static final String X_MS_UMASK = "x-ms-umask"; public static final String X_MS_NAMESPACE_ENABLED = "x-ms-namespace-enabled"; public static final String X_MS_ABFS_CLIENT_LATENCY = "x-ms-abfs-client-latency"; + public static final String X_MS_LEASE_ACTION = "x-ms-lease-action"; + public static final String X_MS_LEASE_DURATION = "x-ms-lease-duration"; + public static final String X_MS_LEASE_ID = "x-ms-lease-id"; + public static final String X_MS_PROPOSED_LEASE_ID = "x-ms-proposed-lease-id"; + public static final String X_MS_LEASE_BREAK_PERIOD = "x-ms-lease-break-period"; private HttpHeaderConfigurations() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/annotations/ConfigurationValidationAnnotations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/annotations/ConfigurationValidationAnnotations.java index 82c571a3b03..9fbe5a22cdf 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/annotations/ConfigurationValidationAnnotations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/annotations/ConfigurationValidationAnnotations.java @@ -46,6 +46,22 @@ public class ConfigurationValidationAnnotations { boolean ThrowIfInvalid() default false; } + @Target({ ElementType.FIELD }) + @Retention(RetentionPolicy.RUNTIME) + public @interface IntegerWithOutlierConfigurationValidatorAnnotation { + String ConfigurationKey(); + + int MaxValue() default Integer.MAX_VALUE; + + int MinValue() default Integer.MIN_VALUE; + + int OutlierValue() default Integer.MIN_VALUE; + + int DefaultValue(); + + boolean ThrowIfInvalid() default false; + } + /** * Describes the requirements when validating the annotated long field. */ diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AzureBlobFileSystemException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AzureBlobFileSystemException.java index 9b1bead886e..d829c5ac677 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AzureBlobFileSystemException.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AzureBlobFileSystemException.java @@ -37,6 +37,10 @@ public abstract class AzureBlobFileSystemException extends IOException { super(message, innerException); } + public AzureBlobFileSystemException(final String message, final Throwable innerThrowable) { + super(message, innerThrowable); + } + @Override public String toString() { if (this.getMessage() == null && this.getCause() == null) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java index fb4d29f8794..7369bfaf564 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java @@ -33,17 +33,20 @@ public class AppendRequestParameters { private final int length; private final Mode mode; private final boolean isAppendBlob; + private final String leaseId; public AppendRequestParameters(final long position, final int offset, final int length, final Mode mode, - final boolean isAppendBlob) { + final boolean isAppendBlob, + final String leaseId) { this.position = position; this.offset = offset; this.length = length; this.mode = mode; this.isAppendBlob = isAppendBlob; + this.leaseId = leaseId; } public long getPosition() { @@ -66,4 +69,7 @@ public class AppendRequestParameters { return this.isAppendBlob; } + public String getLeaseId() { + return this.leaseId; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/diagnostics/IntegerConfigurationBasicValidator.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/diagnostics/IntegerConfigurationBasicValidator.java index 26c7d2f0ac1..9d4beb74bbe 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/diagnostics/IntegerConfigurationBasicValidator.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/diagnostics/IntegerConfigurationBasicValidator.java @@ -31,11 +31,18 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationVa public class IntegerConfigurationBasicValidator extends ConfigurationBasicValidator implements ConfigurationValidator { private final int min; private final int max; + private final int outlier; public IntegerConfigurationBasicValidator(final int min, final int max, final int defaultVal, final String configKey, final boolean throwIfInvalid) { + this(min, min, max, defaultVal, configKey, throwIfInvalid); + } + + public IntegerConfigurationBasicValidator(final int outlier, final int min, final int max, + final int defaultVal, final String configKey, final boolean throwIfInvalid) { super(configKey, defaultVal, throwIfInvalid); this.min = min; this.max = max; + this.outlier = outlier; } public Integer validate(final String configValue) throws InvalidConfigurationValueException { @@ -47,10 +54,14 @@ public class IntegerConfigurationBasicValidator extends ConfigurationBasicValida try { result = Integer.parseInt(configValue); // throw an exception if a 'within bounds' value is missing - if (getThrowIfInvalid() && (result < this.min || result > this.max)) { + if (getThrowIfInvalid() && (result != outlier) && (result < this.min || result > this.max)) { throw new InvalidConfigurationValueException(getConfigKey()); } + if (result == outlier) { + return result; + } + // set the value to the nearest bound if it's out of bounds if (result < this.min) { return this.min; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 92b24f0dda2..7c8a2112bfa 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -29,16 +29,27 @@ import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Locale; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.base.Strings; -import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; -import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; -import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; -import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.FutureCallback; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableScheduledFuture; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningScheduledExecutorService; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; +import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; +import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderException; @@ -49,6 +60,8 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters; import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider; import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT; @@ -76,6 +89,8 @@ public class AbfsClient implements Closeable { private SASTokenProvider sasTokenProvider; private final AbfsCounters abfsCounters; + private final ListeningScheduledExecutorService executorService; + private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials, final AbfsConfiguration abfsConfiguration, final AbfsClientContext abfsClientContext) { @@ -106,6 +121,11 @@ public class AbfsClient implements Closeable { this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName); this.abfsPerfTracker = abfsClientContext.getAbfsPerfTracker(); this.abfsCounters = abfsClientContext.getAbfsCounters(); + + ThreadFactory tf = + new ThreadFactoryBuilder().setNameFormat("AbfsClient Lease Ops").setDaemon(true).build(); + this.executorService = MoreExecutors.listeningDecorator( + HadoopExecutors.newScheduledThreadPool(this.abfsConfiguration.getNumLeaseThreads(), tf)); } public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials, @@ -129,6 +149,7 @@ public class AbfsClient implements Closeable { if (tokenProvider instanceof Closeable) { IOUtils.cleanupWithLogger(LOG, (Closeable) tokenProvider); } + HadoopExecutors.shutdown(executorService, LOG, 0, TimeUnit.SECONDS); } public String getFileSystem() { @@ -317,6 +338,83 @@ public class AbfsClient implements Closeable { return op; } + public AbfsRestOperation acquireLease(final String path, int duration) throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, ACQUIRE_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_DURATION, Integer.toString(duration))); + requestHeaders.add(new AbfsHttpHeader(X_MS_PROPOSED_LEASE_ID, UUID.randomUUID().toString())); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = new AbfsRestOperation( + AbfsRestOperationType.LeasePath, + this, + HTTP_METHOD_POST, + url, + requestHeaders); + op.execute(); + return op; + } + + public AbfsRestOperation renewLease(final String path, final String leaseId) throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, RENEW_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId)); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = new AbfsRestOperation( + AbfsRestOperationType.LeasePath, + this, + HTTP_METHOD_POST, + url, + requestHeaders); + op.execute(); + return op; + } + + public AbfsRestOperation releaseLease(final String path, final String leaseId) throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, RELEASE_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId)); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = new AbfsRestOperation( + AbfsRestOperationType.LeasePath, + this, + HTTP_METHOD_POST, + url, + requestHeaders); + op.execute(); + return op; + } + + public AbfsRestOperation breakLease(final String path) throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, BREAK_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_BREAK_PERIOD, DEFAULT_LEASE_BREAK_PERIOD)); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = new AbfsRestOperation( + AbfsRestOperationType.LeasePath, + this, + HTTP_METHOD_POST, + url, + requestHeaders); + op.execute(); + return op; + } + public AbfsRestOperation renamePath(String source, final String destination, final String continuation) throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); @@ -416,6 +514,9 @@ public class AbfsClient implements Closeable { // PUT and specify the real method in the X-Http-Method-Override header. requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, HTTP_METHOD_PATCH)); + if (reqParams.getLeaseId() != null) { + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, reqParams.getLeaseId())); + } final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION); @@ -492,13 +593,16 @@ public class AbfsClient implements Closeable { } public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData, - boolean isClose, final String cachedSasToken) + boolean isClose, final String cachedSasToken, final String leaseId) throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); // JDK7 does not support PATCH, so to workaround the issue we will use // PUT and specify the real method in the X-Http-Method-Override header. requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, HTTP_METHOD_PATCH)); + if (leaseId != null) { + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId)); + } final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, FLUSH_ACTION); @@ -1003,4 +1107,21 @@ public class AbfsClient implements Closeable { protected AbfsCounters getAbfsCounters() { return abfsCounters; } + + public int getNumLeaseThreads() { + return abfsConfiguration.getNumLeaseThreads(); + } + + public ListenableScheduledFuture schedule(Callable callable, long delay, + TimeUnit timeUnit) { + return executorService.schedule(callable, delay, timeUnit); + } + + public ListenableFuture submit(Runnable runnable) { + return executorService.submit(runnable); + } + + public void addCallback(ListenableFuture future, FutureCallback callback) { + Futures.addCallback(future, callback, executorService); + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java new file mode 100644 index 00000000000..e15795efee6 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs.services; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_THREADS; + +/** + * ABFS error constants. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public final class AbfsErrors { + public static final String ERR_WRITE_WITHOUT_LEASE = "Attempted to write to file without lease"; + public static final String ERR_LEASE_EXPIRED = "A lease ID was specified, but the lease for the" + + " resource has expired"; + public static final String ERR_NO_LEASE_ID_SPECIFIED = "There is currently a lease on the " + + "resource and no lease ID was specified in the request"; + public static final String ERR_PARALLEL_ACCESS_DETECTED = "Parallel access to the create path " + + "detected. Failing request to honor single writer semantics"; + public static final String ERR_ACQUIRING_LEASE = "Unable to acquire lease"; + public static final String ERR_LEASE_ALREADY_PRESENT = "There is already a lease present"; + public static final String ERR_LEASE_NOT_PRESENT = "There is currently no lease on the resource"; + public static final String ERR_LEASE_ID_NOT_PRESENT = "The lease ID is not present with the " + + "specified lease operation"; + public static final String ERR_LEASE_DID_NOT_MATCH = "The lease ID specified did not match the " + + "lease ID for the resource with the specified lease operation"; + public static final String ERR_LEASE_BROKEN = "The lease ID matched, but the lease has been " + + "broken explicitly and cannot be renewed"; + public static final String ERR_LEASE_FUTURE_EXISTS = "There is already an existing lease " + + "operation"; + public static final String ERR_NO_LEASE_THREADS = "Lease desired but no lease threads " + + "configured, set " + FS_AZURE_LEASE_THREADS; + + private AbfsErrors() {} +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java new file mode 100644 index 00000000000..97a8b0228a5 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.FutureCallback; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableScheduledFuture; +import org.apache.hadoop.thirdparty.org.checkerframework.checker.nullness.qual.Nullable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; + +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.INFINITE_LEASE_DURATION; +import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_ACQUIRING_LEASE; +import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_LEASE_FUTURE_EXISTS; +import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_NO_LEASE_THREADS; + +/** + * AbfsLease manages an Azure blob lease. It acquires an infinite lease on instantiation and + * releases the lease when free() is called. Use it to prevent writes to the blob by other + * processes that don't have the lease. + * + * Creating a new Lease object blocks the caller until the Azure blob lease is acquired. It will + * retry a fixed number of times before failing if there is a problem acquiring the lease. + * + * Call free() to release the Lease. If the holder process dies, AzureBlobFileSystem breakLease + * will need to be called before another client will be able to write to the file. + */ +public final class AbfsLease { + private static final Logger LOG = LoggerFactory.getLogger(AbfsLease.class); + + // Number of retries for acquiring lease + static final int DEFAULT_LEASE_ACQUIRE_MAX_RETRIES = 7; + // Retry interval for acquiring lease in secs + static final int DEFAULT_LEASE_ACQUIRE_RETRY_INTERVAL = 10; + + private final AbfsClient client; + private final String path; + + // Lease status variables + private volatile boolean leaseFreed; + private volatile String leaseID = null; + private volatile Throwable exception = null; + private volatile int acquireRetryCount = 0; + private volatile ListenableScheduledFuture future = null; + + public static class LeaseException extends AzureBlobFileSystemException { + public LeaseException(Throwable t) { + super(ERR_ACQUIRING_LEASE + ": " + t, t); + } + + public LeaseException(String s) { + super(s); + } + } + + public AbfsLease(AbfsClient client, String path) throws AzureBlobFileSystemException { + this(client, path, DEFAULT_LEASE_ACQUIRE_MAX_RETRIES, DEFAULT_LEASE_ACQUIRE_RETRY_INTERVAL); + } + + @VisibleForTesting + public AbfsLease(AbfsClient client, String path, int acquireMaxRetries, + int acquireRetryInterval) throws AzureBlobFileSystemException { + this.leaseFreed = false; + this.client = client; + this.path = path; + + if (client.getNumLeaseThreads() < 1) { + throw new LeaseException(ERR_NO_LEASE_THREADS); + } + + // Try to get the lease a specified number of times, else throw an error + RetryPolicy retryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( + acquireMaxRetries, acquireRetryInterval, TimeUnit.SECONDS); + acquireLease(retryPolicy, 0, acquireRetryInterval, 0); + + while (leaseID == null && exception == null) { + try { + future.get(); + } catch (Exception e) { + LOG.debug("Got exception waiting for acquire lease future. Checking if lease ID or " + + "exception have been set", e); + } + } + if (exception != null) { + LOG.error("Failed to acquire lease on {}", path); + throw new LeaseException(exception); + } + + LOG.debug("Acquired lease {} on {}", leaseID, path); + } + + private void acquireLease(RetryPolicy retryPolicy, int numRetries, int retryInterval, long delay) + throws LeaseException { + LOG.debug("Attempting to acquire lease on {}, retry {}", path, numRetries); + if (future != null && !future.isDone()) { + throw new LeaseException(ERR_LEASE_FUTURE_EXISTS); + } + future = client.schedule(() -> client.acquireLease(path, INFINITE_LEASE_DURATION), + delay, TimeUnit.SECONDS); + client.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable AbfsRestOperation op) { + leaseID = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_LEASE_ID); + LOG.debug("Acquired lease {} on {}", leaseID, path); + } + + @Override + public void onFailure(Throwable throwable) { + try { + if (RetryPolicy.RetryAction.RetryDecision.RETRY + == retryPolicy.shouldRetry(null, numRetries, 0, true).action) { + LOG.debug("Failed to acquire lease on {}, retrying: {}", path, throwable); + acquireRetryCount++; + acquireLease(retryPolicy, numRetries + 1, retryInterval, retryInterval); + } else { + exception = throwable; + } + } catch (Exception e) { + exception = throwable; + } + } + }); + } + + /** + * Cancel future and free the lease. If an exception occurs while releasing the lease, the error + * will be logged. If the lease cannot be released, AzureBlobFileSystem breakLease will need to + * be called before another client will be able to write to the file. + */ + public void free() { + if (leaseFreed) { + return; + } + try { + LOG.debug("Freeing lease: path {}, lease id {}", path, leaseID); + if (future != null && !future.isDone()) { + future.cancel(true); + } + client.releaseLease(path, leaseID); + } catch (IOException e) { + LOG.warn("Exception when trying to release lease {} on {}. Lease will need to be broken: {}", + leaseID, path, e.getMessage()); + } finally { + // Even if releasing the lease fails (e.g. because the file was deleted), + // make sure to record that we freed the lease + leaseFreed = true; + LOG.debug("Freed lease {} on {}", leaseID, path); + } + } + + public boolean isFreed() { + return leaseFreed; + } + + public String getLeaseID() { + return leaseID; + } + + @VisibleForTesting + public int getAcquireRetryCount() { + return acquireRetryCount; + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 2d02019ab11..80b35ee4d3a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -37,6 +37,7 @@ import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters; @@ -53,6 +54,7 @@ import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.Syncable; +import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_WRITE_WITHOUT_LEASE; import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable; import static org.apache.hadoop.io.IOUtils.wrapException; import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE; @@ -92,6 +94,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable, // SAS tokens can be re-used until they expire private CachedSASToken cachedSasToken; + private AbfsLease lease; + private String leaseId; + /** * Queue storing buffers with the size of the Azure block ready for * reuse. The pool allows reusing the blocks instead of allocating new @@ -142,6 +147,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable, } this.maxRequestsThatCanBeQueued = abfsOutputStreamContext .getMaxWriteRequestsToQueue(); + + this.lease = abfsOutputStreamContext.getLease(); + this.leaseId = abfsOutputStreamContext.getLeaseId(); + this.threadExecutor = new ThreadPoolExecutor(maxConcurrentRequestCount, maxConcurrentRequestCount, @@ -203,6 +212,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable, throw new IndexOutOfBoundsException(); } + if (hasLease() && isLeaseFreed()) { + throw new PathIOException(path, ERR_WRITE_WITHOUT_LEASE); + } + int currentOffset = off; int writableBytes = bufferSize - bufferIndex; int numberOfBytesToWrite = length; @@ -306,6 +319,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable, // See HADOOP-16785 throw wrapException(path, e.getMessage(), e); } finally { + if (hasLease()) { + lease.free(); + lease = null; + } lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED); buffer = null; bufferIndex = 0; @@ -372,7 +389,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "writeCurrentBufferToService", "append")) { AppendRequestParameters reqParams = new AppendRequestParameters(offset, 0, - bytesLength, APPEND_MODE, true); + bytesLength, APPEND_MODE, true, leaseId); AbfsRestOperation op = client.append(path, bytes, reqParams, cachedSasToken.get()); cachedSasToken.update(op.getSasToken()); if (outputStreamStatistics != null) { @@ -448,7 +465,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, mode = FLUSH_MODE; } AppendRequestParameters reqParams = new AppendRequestParameters( - offset, 0, bytesLength, mode, false); + offset, 0, bytesLength, mode, false, leaseId); AbfsRestOperation op = client.append(path, bytes, reqParams, cachedSasToken.get()); cachedSasToken.update(op.getSasToken()); @@ -517,7 +534,8 @@ public class AbfsOutputStream extends OutputStream implements Syncable, AbfsPerfTracker tracker = client.getAbfsPerfTracker(); try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "flushWrittenBytesToServiceInternal", "flush")) { - AbfsRestOperation op = client.flush(path, offset, retainUncommitedData, isClose, cachedSasToken.get()); + AbfsRestOperation op = client.flush(path, offset, retainUncommitedData, isClose, + cachedSasToken.get(), leaseId); cachedSasToken.update(op.getSasToken()); perfInfo.registerResult(op.getResult()).registerSuccess(true); } catch (AzureBlobFileSystemException ex) { @@ -637,6 +655,19 @@ public class AbfsOutputStream extends OutputStream implements Syncable, return ioStatistics; } + @VisibleForTesting + public boolean isLeaseFreed() { + if (lease == null) { + return true; + } + return lease.isFreed(); + } + + @VisibleForTesting + public boolean hasLease() { + return lease != null; + } + /** * Appending AbfsOutputStream statistics to base toString(). * diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java index 925cd4f7b56..48f6f540810 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java @@ -39,6 +39,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext { private int maxWriteRequestsToQueue; + private AbfsLease lease; + public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) { super(sasTokenRenewPeriodForStreamsInSeconds); } @@ -94,6 +96,11 @@ public class AbfsOutputStreamContext extends AbfsStreamContext { return this; } + public AbfsOutputStreamContext withLease(final AbfsLease lease) { + this.lease = lease; + return this; + } + public int getWriteBufferSize() { return writeBufferSize; } @@ -125,4 +132,15 @@ public class AbfsOutputStreamContext extends AbfsStreamContext { public boolean isEnableSmallWriteOptimization() { return this.enableSmallWriteOptimization; } + + public AbfsLease getLease() { + return this.lease; + } + + public String getLeaseId() { + if (this.lease == null) { + return null; + } + return this.lease.getLeaseID(); + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index 584b71f1ee5..b046cbc03a3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -131,6 +131,7 @@ public class AbfsRestOperation { this.url = url; this.requestHeaders = requestHeaders; this.hasRequestBody = (AbfsHttpConstants.HTTP_METHOD_PUT.equals(method) + || AbfsHttpConstants.HTTP_METHOD_POST.equals(method) || AbfsHttpConstants.HTTP_METHOD_PATCH.equals(method)); this.sasToken = sasToken; this.abfsCounters = client.getAbfsCounters(); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationType.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationType.java index d3031860dd1..830297f381b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationType.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationType.java @@ -40,5 +40,6 @@ public enum AbfsRestOperationType { Flush, ReadFile, DeletePath, - CheckAccess + CheckAccess, + LeasePath, } diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md index 33d4a0fa428..6be5952b03a 100644 --- a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md +++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md @@ -887,6 +887,22 @@ enabled for your Azure Storage account." The directories can be specified as comma separated values. By default the value is "/hbase" +### Infinite Lease Options +`fs.azure.infinite-lease.directories`: Directories for infinite lease support +can be specified comma separated in this config. By default, multiple +clients will be able to write to the same file simultaneously. When writing +to files contained within the directories specified in this config, the +client will obtain a lease on the file that will prevent any other clients +from writing to the file. When the output stream is closed, the lease will be +released. To revoke a client's write access for a file, the +AzureBlobFilesystem breakLease method may be called. If the client dies +before the file can be closed and the lease released, breakLease will need to +be called before another client will be able to write to the file. + +`fs.azure.lease.threads`: This is the size of the thread pool that will be +used for lease operations for infinite lease directories. By default the value +is 0, so it must be set to at least 1 to support infinite lease directories. + ### Perf Options #### 1. HTTP Request Tracking Options diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java new file mode 100644 index 00000000000..9857da8957e --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java @@ -0,0 +1,336 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; +import java.util.concurrent.RejectedExecutionException; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.services.AbfsClient; +import org.apache.hadoop.fs.azurebfs.services.AbfsLease; +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.LambdaTestUtils; + +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; + +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_INFINITE_LEASE_KEY; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_THREADS; +import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT; +import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_ACQUIRING_LEASE; +import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_LEASE_EXPIRED; +import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_LEASE_NOT_PRESENT; +import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_NO_LEASE_ID_SPECIFIED; +import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_NO_LEASE_THREADS; +import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_PARALLEL_ACCESS_DETECTED; + +/** + * Test lease operations. + */ +public class ITestAzureBlobFileSystemLease extends AbstractAbfsIntegrationTest { + private static final int TEST_EXECUTION_TIMEOUT = 30 * 1000; + private static final int LONG_TEST_EXECUTION_TIMEOUT = 90 * 1000; + private static final String TEST_FILE = "testfile"; + private final boolean isHNSEnabled; + + public ITestAzureBlobFileSystemLease() throws Exception { + super(); + + this.isHNSEnabled = getConfiguration() + .getBoolean(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT, false); + } + + private AzureBlobFileSystem getCustomFileSystem(Path infiniteLeaseDirs, int numLeaseThreads) throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(String.format("fs.%s.impl.disable.cache", getAbfsScheme()), true); + conf.set(FS_AZURE_INFINITE_LEASE_KEY, infiniteLeaseDirs.toUri().getPath()); + conf.setInt(FS_AZURE_LEASE_THREADS, numLeaseThreads); + return getFileSystem(conf); + } + + @Test(timeout = TEST_EXECUTION_TIMEOUT) + public void testNoInfiniteLease() throws IOException { + final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE); + final AzureBlobFileSystem fs = getFileSystem(); + fs.mkdirs(testFilePath.getParent()); + try (FSDataOutputStream out = fs.create(testFilePath)) { + Assert.assertFalse("Output stream should not have lease", + ((AbfsOutputStream) out.getWrappedStream()).hasLease()); + } + Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed()); + } + + @Test(timeout = TEST_EXECUTION_TIMEOUT) + public void testNoLeaseThreads() throws Exception { + final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE); + final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 0); + fs.mkdirs(testFilePath.getParent()); + LambdaTestUtils.intercept(IOException.class, ERR_NO_LEASE_THREADS, () -> { + try (FSDataOutputStream out = fs.create(testFilePath)) { + } + return "No failure when lease requested with 0 lease threads"; + }); + } + + @Test(timeout = TEST_EXECUTION_TIMEOUT) + public void testOneWriter() throws Exception { + final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE); + final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1); + fs.mkdirs(testFilePath.getParent()); + + FSDataOutputStream out = fs.create(testFilePath); + Assert.assertTrue("Output stream should have lease", + ((AbfsOutputStream) out.getWrappedStream()).hasLease()); + out.close(); + Assert.assertFalse("Output stream should not have lease", + ((AbfsOutputStream) out.getWrappedStream()).hasLease()); + Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed()); + } + + @Test(timeout = TEST_EXECUTION_TIMEOUT) + public void testSubDir() throws Exception { + final Path testFilePath = new Path(new Path(path(methodName.getMethodName()), "subdir"), + TEST_FILE); + final AzureBlobFileSystem fs = + getCustomFileSystem(testFilePath.getParent().getParent(), 1); + fs.mkdirs(testFilePath.getParent().getParent()); + + FSDataOutputStream out = fs.create(testFilePath); + Assert.assertTrue("Output stream should have lease", + ((AbfsOutputStream) out.getWrappedStream()).hasLease()); + out.close(); + Assert.assertFalse("Output stream should not have lease", + ((AbfsOutputStream) out.getWrappedStream()).hasLease()); + Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed()); + } + + @Test(timeout = TEST_EXECUTION_TIMEOUT) + public void testTwoCreate() throws Exception { + final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE); + final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1); + fs.mkdirs(testFilePath.getParent()); + + try (FSDataOutputStream out = fs.create(testFilePath)) { + LambdaTestUtils.intercept(IOException.class, isHNSEnabled ? ERR_PARALLEL_ACCESS_DETECTED + : ERR_NO_LEASE_ID_SPECIFIED, () -> { + try (FSDataOutputStream out2 = fs.create(testFilePath)) { + } + return "Expected second create on infinite lease dir to fail"; + }); + } + Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed()); + } + + private void twoWriters(AzureBlobFileSystem fs, Path testFilePath, boolean expectException) throws Exception { + try (FSDataOutputStream out = fs.create(testFilePath)) { + try (FSDataOutputStream out2 = fs.append(testFilePath)) { + out2.writeInt(2); + out2.hsync(); + } catch (IOException e) { + if (expectException) { + GenericTestUtils.assertExceptionContains(ERR_ACQUIRING_LEASE, e); + } else { + throw e; + } + } + out.writeInt(1); + out.hsync(); + } + + Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed()); + } + + @Test(timeout = TEST_EXECUTION_TIMEOUT) + public void testTwoWritersCreateAppendNoInfiniteLease() throws Exception { + final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE); + final AzureBlobFileSystem fs = getFileSystem(); + fs.mkdirs(testFilePath.getParent()); + + twoWriters(fs, testFilePath, false); + } + + @Test(timeout = LONG_TEST_EXECUTION_TIMEOUT) + public void testTwoWritersCreateAppendWithInfiniteLeaseEnabled() throws Exception { + final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE); + final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1); + fs.mkdirs(testFilePath.getParent()); + + twoWriters(fs, testFilePath, true); + } + + @Test(timeout = TEST_EXECUTION_TIMEOUT) + public void testLeaseFreedOnClose() throws Exception { + final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE); + final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1); + fs.mkdirs(testFilePath.getParent()); + + FSDataOutputStream out; + out = fs.create(testFilePath); + out.write(0); + Assert.assertTrue("Output stream should have lease", + ((AbfsOutputStream) out.getWrappedStream()).hasLease()); + out.close(); + Assert.assertFalse("Output stream should not have lease after close", + ((AbfsOutputStream) out.getWrappedStream()).hasLease()); + Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed()); + } + + @Test(timeout = TEST_EXECUTION_TIMEOUT) + public void testWriteAfterBreakLease() throws Exception { + final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE); + final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1); + fs.mkdirs(testFilePath.getParent()); + + FSDataOutputStream out; + out = fs.create(testFilePath); + out.write(0); + out.hsync(); + + fs.breakLease(testFilePath); + + LambdaTestUtils.intercept(IOException.class, ERR_LEASE_EXPIRED, () -> { + out.write(1); + out.hsync(); + return "Expected exception on write after lease break but got " + out; + }); + + LambdaTestUtils.intercept(IOException.class, ERR_LEASE_EXPIRED, () -> { + out.close(); + return "Expected exception on close after lease break but got " + out; + }); + + Assert.assertTrue("Output stream lease should be freed", + ((AbfsOutputStream) out.getWrappedStream()).isLeaseFreed()); + + try (FSDataOutputStream out2 = fs.append(testFilePath)) { + out2.write(2); + out2.hsync(); + } + + Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed()); + } + + @Test(timeout = LONG_TEST_EXECUTION_TIMEOUT) + public void testLeaseFreedAfterBreak() throws Exception { + final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE); + final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1); + fs.mkdirs(testFilePath.getParent()); + + FSDataOutputStream out = fs.create(testFilePath); + out.write(0); + + fs.breakLease(testFilePath); + + LambdaTestUtils.intercept(IOException.class, ERR_LEASE_EXPIRED, () -> { + out.close(); + return "Expected exception on close after lease break but got " + out; + }); + + Assert.assertTrue("Output stream lease should be freed", + ((AbfsOutputStream) out.getWrappedStream()).isLeaseFreed()); + + Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed()); + } + + @Test(timeout = TEST_EXECUTION_TIMEOUT) + public void testInfiniteLease() throws Exception { + final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE); + final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1); + fs.mkdirs(testFilePath.getParent()); + + try (FSDataOutputStream out = fs.create(testFilePath)) { + Assert.assertTrue("Output stream should have lease", + ((AbfsOutputStream) out.getWrappedStream()).hasLease()); + out.write(0); + } + Assert.assertTrue(fs.getAbfsStore().areLeasesFreed()); + + try (FSDataOutputStream out = fs.append(testFilePath)) { + Assert.assertTrue("Output stream should have lease", + ((AbfsOutputStream) out.getWrappedStream()).hasLease()); + out.write(1); + } + Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed()); + } + + @Test(timeout = TEST_EXECUTION_TIMEOUT) + public void testFileSystemClose() throws Exception { + final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE); + final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1); + fs.mkdirs(testFilePath.getParent()); + + FSDataOutputStream out = fs.create(testFilePath); + out.write(0); + Assert.assertFalse("Store leases should exist", fs.getAbfsStore().areLeasesFreed()); + fs.close(); + Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed()); + + LambdaTestUtils.intercept(IOException.class, isHNSEnabled ? ERR_LEASE_NOT_PRESENT + : ERR_LEASE_EXPIRED, () -> { + out.close(); + return "Expected exception on close after closed FS but got " + out; + }); + + LambdaTestUtils.intercept(RejectedExecutionException.class, () -> { + try (FSDataOutputStream out2 = fs.append(testFilePath)) { + } + return "Expected exception on new append after closed FS"; + }); + } + + @Test(timeout = TEST_EXECUTION_TIMEOUT) + public void testAcquireRetry() throws Exception { + final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE); + final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1); + fs.mkdirs(testFilePath.getParent()); + fs.createNewFile(testFilePath); + + AbfsLease lease = new AbfsLease(fs.getAbfsClient(), testFilePath.toUri().getPath()); + Assert.assertNotNull("Did not successfully lease file", lease.getLeaseID()); + lease.free(); + Assert.assertEquals("Unexpected acquire retry count", 0, lease.getAcquireRetryCount()); + + AbfsClient mockClient = spy(fs.getAbfsClient()); + + doThrow(new AbfsLease.LeaseException("failed to acquire 1")) + .doThrow(new AbfsLease.LeaseException("failed to acquire 2")) + .doCallRealMethod() + .when(mockClient).acquireLease(anyString(), anyInt()); + + lease = new AbfsLease(mockClient, testFilePath.toUri().getPath(), 5, 1); + Assert.assertNotNull("Acquire lease should have retried", lease.getLeaseID()); + lease.free(); + Assert.assertEquals("Unexpected acquire retry count", 2, lease.getAcquireRetryCount()); + + doThrow(new AbfsLease.LeaseException("failed to acquire")) + .when(mockClient).acquireLease(anyString(), anyInt()); + + LambdaTestUtils.intercept(AzureBlobFileSystemException.class, () -> { + new AbfsLease(mockClient, testFilePath.toUri().getPath(), 5, 1); + }); + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/diagnostics/TestConfigurationValidators.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/diagnostics/TestConfigurationValidators.java index f02eadc9a04..6a02435fc6e 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/diagnostics/TestConfigurationValidators.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/diagnostics/TestConfigurationValidators.java @@ -24,11 +24,14 @@ import org.junit.Test; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException; import org.apache.hadoop.fs.azurebfs.utils.Base64; -import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE; -import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_LEASE_DURATION; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE; - +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.INFINITE_LEASE_DURATION; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_LEASE_DURATION; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_LEASE_DURATION; /** * Test configuration validators. @@ -58,6 +61,26 @@ public class TestConfigurationValidators extends Assert { integerConfigurationValidator.validate("3072"); } + @Test + public void testIntegerWithOutlierConfigValidator() throws Exception { + IntegerConfigurationBasicValidator integerConfigurationValidator = new IntegerConfigurationBasicValidator( + INFINITE_LEASE_DURATION, MIN_LEASE_DURATION, MAX_LEASE_DURATION, DEFAULT_LEASE_DURATION, FAKE_KEY, + false); + + assertEquals(INFINITE_LEASE_DURATION, (int) integerConfigurationValidator.validate("-1")); + assertEquals(DEFAULT_LEASE_DURATION, (int) integerConfigurationValidator.validate(null)); + assertEquals(MIN_LEASE_DURATION, (int) integerConfigurationValidator.validate("15")); + assertEquals(MAX_LEASE_DURATION, (int) integerConfigurationValidator.validate("60")); + } + + @Test(expected = InvalidConfigurationValueException.class) + public void testIntegerWithOutlierConfigValidatorThrowsIfMissingValidValue() throws Exception { + IntegerConfigurationBasicValidator integerConfigurationValidator = new IntegerConfigurationBasicValidator( + INFINITE_LEASE_DURATION, MIN_LEASE_DURATION, MAX_LEASE_DURATION, DEFAULT_LEASE_DURATION, FAKE_KEY, + true); + integerConfigurationValidator.validate("14"); + } + @Test public void testLongConfigValidator() throws Exception { LongConfigurationBasicValidator longConfigurationValidator = new LongConfigurationBasicValidator( diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java index 1e6b8efe6d9..f4243bc7e28 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters; import org.apache.hadoop.conf.Configuration; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.ArgumentMatchers.refEq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -86,7 +87,7 @@ public final class TestAbfsOutputStream { AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op); - when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op); + when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op); AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false)); @@ -104,9 +105,9 @@ public final class TestAbfsOutputStream { out.hsync(); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, WRITE_SIZE, APPEND_MODE, false); + 0, 0, WRITE_SIZE, APPEND_MODE, false, null); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - WRITE_SIZE, 0, 2 * WRITE_SIZE, APPEND_MODE, false); + WRITE_SIZE, 0, 2 * WRITE_SIZE, APPEND_MODE, false, null); verify(client, times(1)).append( eq(PATH), any(byte[].class), refEq(firstReqParameters), any()); @@ -133,7 +134,7 @@ public final class TestAbfsOutputStream { when(client.getAbfsPerfTracker()).thenReturn(tracker); when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op); - when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op); + when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op); AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false)); @@ -146,9 +147,9 @@ public final class TestAbfsOutputStream { out.close(); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, BUFFER_SIZE, APPEND_MODE, false); + 0, 0, BUFFER_SIZE, APPEND_MODE, false, null); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - BUFFER_SIZE, 0, 5*WRITE_SIZE-BUFFER_SIZE, APPEND_MODE, false); + BUFFER_SIZE, 0, 5*WRITE_SIZE-BUFFER_SIZE, APPEND_MODE, false, null); verify(client, times(1)).append( eq(PATH), any(byte[].class), refEq(firstReqParameters), any()); @@ -165,7 +166,7 @@ public final class TestAbfsOutputStream { ArgumentCaptor acFlushSASToken = ArgumentCaptor.forClass(String.class); verify(client, times(1)).flush(acFlushPath.capture(), acFlushPosition.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(), - acFlushSASToken.capture()); + acFlushSASToken.capture(), isNull()); assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushPath.getAllValues()); assertThat(Arrays.asList(Long.valueOf(5*WRITE_SIZE))).describedAs("position").isEqualTo(acFlushPosition.getAllValues()); assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues()); @@ -189,7 +190,7 @@ public final class TestAbfsOutputStream { when(client.getAbfsPerfTracker()).thenReturn(tracker); when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op); - when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op); + when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op); when(op.getSasToken()).thenReturn("testToken"); when(op.getResult()).thenReturn(httpOp); @@ -204,9 +205,9 @@ public final class TestAbfsOutputStream { out.close(); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, BUFFER_SIZE, APPEND_MODE, false); + 0, 0, BUFFER_SIZE, APPEND_MODE, false, null); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false); + BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null); verify(client, times(1)).append( eq(PATH), any(byte[].class), refEq(firstReqParameters), any()); @@ -223,7 +224,7 @@ public final class TestAbfsOutputStream { ArgumentCaptor acFlushSASToken = ArgumentCaptor.forClass(String.class); verify(client, times(1)).flush(acFlushPath.capture(), acFlushPosition.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(), - acFlushSASToken.capture()); + acFlushSASToken.capture(), isNull()); assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushPath.getAllValues()); assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushPosition.getAllValues()); assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues()); @@ -247,7 +248,7 @@ public final class TestAbfsOutputStream { when(client.getAbfsPerfTracker()).thenReturn(tracker); when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op); - when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op); + when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op); when(op.getSasToken()).thenReturn("testToken"); when(op.getResult()).thenReturn(httpOp); @@ -262,9 +263,9 @@ public final class TestAbfsOutputStream { Thread.sleep(1000); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, BUFFER_SIZE, APPEND_MODE, false); + 0, 0, BUFFER_SIZE, APPEND_MODE, false, null); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false); + BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null); verify(client, times(1)).append( eq(PATH), any(byte[].class), refEq(firstReqParameters), any()); @@ -291,7 +292,7 @@ public final class TestAbfsOutputStream { when(client.getAbfsPerfTracker()).thenReturn(tracker); when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op); - when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op); + when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op); AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, true)); @@ -304,9 +305,9 @@ public final class TestAbfsOutputStream { Thread.sleep(1000); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, BUFFER_SIZE, APPEND_MODE, true); + 0, 0, BUFFER_SIZE, APPEND_MODE, true, null); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, true); + BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, true, null); verify(client, times(1)).append( eq(PATH), any(byte[].class), refEq(firstReqParameters), any()); @@ -334,7 +335,7 @@ public final class TestAbfsOutputStream { when(client.getAbfsPerfTracker()).thenReturn(tracker); when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op); - when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op); + when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op); AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false)); @@ -347,9 +348,9 @@ public final class TestAbfsOutputStream { out.hflush(); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, BUFFER_SIZE, APPEND_MODE, false); + 0, 0, BUFFER_SIZE, APPEND_MODE, false, null); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false); + BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null); verify(client, times(1)).append( eq(PATH), any(byte[].class), refEq(firstReqParameters), any()); @@ -366,7 +367,7 @@ public final class TestAbfsOutputStream { ArgumentCaptor acFlushSASToken = ArgumentCaptor.forClass(String.class); verify(client, times(1)).flush(acFlushPath.capture(), acFlushPosition.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(), - acFlushSASToken.capture()); + acFlushSASToken.capture(), isNull()); assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushPath.getAllValues()); assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushPosition.getAllValues()); assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues()); @@ -388,7 +389,7 @@ public final class TestAbfsOutputStream { AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op); - when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op); + when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op); AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false)); @@ -403,9 +404,9 @@ public final class TestAbfsOutputStream { Thread.sleep(1000); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, BUFFER_SIZE, APPEND_MODE, false); + 0, 0, BUFFER_SIZE, APPEND_MODE, false, null); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false); + BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null); verify(client, times(1)).append( eq(PATH), any(byte[].class), refEq(firstReqParameters), any());