HADOOP-16948. Support infinite lease dirs (#1925). Contributed by Billie Rinaldi.

(cherry picked from commit c1fde4fe94)
This commit is contained in:
billierinaldi 2021-04-12 19:47:59 -04:00 committed by Billie Rinaldi
parent 1960924d07
commit 8170a7bb60
22 changed files with 1032 additions and 42 deletions

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.AuthConfigurations;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerWithOutlierConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.LongConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.StringConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.Base64StringConfigurationValidatorAnnotation;
@ -203,6 +204,15 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES)
private String azureAppendBlobDirs;
@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_INFINITE_LEASE_KEY,
DefaultValue = DEFAULT_FS_AZURE_INFINITE_LEASE_DIRECTORIES)
private String azureInfiniteLeaseDirs;
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_LEASE_THREADS,
MinValue = MIN_LEASE_THREADS,
DefaultValue = DEFAULT_LEASE_THREADS)
private int numLeaseThreads;
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
DefaultValue = DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION)
private boolean createRemoteFileSystemDuringInitialization;
@ -291,6 +301,8 @@ public AbfsConfiguration(final Configuration rawConfig, String accountName)
field.setAccessible(true);
if (field.isAnnotationPresent(IntegerConfigurationValidatorAnnotation.class)) {
field.set(this, validateInt(field));
} else if (field.isAnnotationPresent(IntegerWithOutlierConfigurationValidatorAnnotation.class)) {
field.set(this, validateIntWithOutlier(field));
} else if (field.isAnnotationPresent(LongConfigurationValidatorAnnotation.class)) {
field.set(this, validateLong(field));
} else if (field.isAnnotationPresent(StringConfigurationValidatorAnnotation.class)) {
@ -625,6 +637,14 @@ public String getAppendBlobDirs() {
return this.azureAppendBlobDirs;
}
public String getAzureInfiniteLeaseDirs() {
return this.azureInfiniteLeaseDirs;
}
public int getNumLeaseThreads() {
return this.numLeaseThreads;
}
public boolean getCreateRemoteFileSystemDuringInitialization() {
// we do not support creating the filesystem when AuthType is SAS
return this.createRemoteFileSystemDuringInitialization
@ -834,6 +854,21 @@ int validateInt(Field field) throws IllegalAccessException, InvalidConfiguration
validator.ThrowIfInvalid()).validate(value);
}
int validateIntWithOutlier(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
IntegerWithOutlierConfigurationValidatorAnnotation validator =
field.getAnnotation(IntegerWithOutlierConfigurationValidatorAnnotation.class);
String value = get(validator.ConfigurationKey());
// validate
return new IntegerConfigurationBasicValidator(
validator.OutlierValue(),
validator.MinValue(),
validator.MaxValue(),
validator.DefaultValue(),
validator.ConfigurationKey(),
validator.ThrowIfInvalid()).validate(value);
}
long validateLong(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
LongConfigurationValidatorAnnotation validator = field.getAnnotation(LongConfigurationValidatorAnnotation.class);
String value = rawConfig.get(validator.ConfigurationKey());

View File

@ -87,6 +87,7 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.functional.RemoteIterators;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Progressable;
@ -505,6 +506,26 @@ public FileStatus getFileStatus(final Path f) throws IOException {
}
}
/**
* Break the current lease on an ABFS file if it exists. A lease that is broken cannot be
* renewed. A new lease may be obtained on the file immediately.
*
* @param f file name
* @throws IOException on any exception while breaking the lease
*/
public void breakLease(final Path f) throws IOException {
LOG.debug("AzureBlobFileSystem.breakLease path: {}", f);
Path qualifiedPath = makeQualified(f);
try (DurationInfo ignored = new DurationInfo(LOG, false, "Break lease for %s",
qualifiedPath)) {
abfsStore.breakLease(qualifiedPath);
} catch(AzureBlobFileSystemException ex) {
checkException(f, ex);
}
}
/**
* Qualify a path to one which uses this FileSystem and, if relative,
* made absolute.

View File

@ -39,6 +39,7 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@ -48,10 +49,14 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -100,6 +105,7 @@
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
import org.apache.hadoop.fs.azurebfs.services.AbfsLease;
import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials;
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfTracker;
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfInfo;
@ -145,8 +151,11 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
private static final String XMS_PROPERTIES_ENCODING = "ISO-8859-1";
private static final int GET_SET_AGGREGATE_COUNT = 2;
private final Map<AbfsLease, Object> leaseRefs;
private final AbfsConfiguration abfsConfiguration;
private final Set<String> azureAtomicRenameDirSet;
private Set<String> azureInfiniteLeaseDirSet;
private Trilean isNamespaceEnabled;
private final AuthType authType;
private final UserGroupInformation userGroupInformation;
@ -167,6 +176,8 @@ public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme,
final String fileSystemName = authorityParts[0];
final String accountName = authorityParts[1];
leaseRefs = Collections.synchronizedMap(new WeakHashMap<>());
try {
this.abfsConfiguration = new AbfsConfiguration(configuration, accountName);
} catch (IllegalAccessException exception) {
@ -195,6 +206,7 @@ public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme,
this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
abfsConfiguration.getAzureAtomicRenameDirs().split(AbfsHttpConstants.COMMA)));
updateInfiniteLeaseDirs();
this.authType = abfsConfiguration.getAuthType(accountName);
boolean usingOauth = (authType == AuthType.OAuth);
boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : isSecureScheme;
@ -246,7 +258,24 @@ public String getPrimaryGroup() {
@Override
public void close() throws IOException {
IOUtils.cleanupWithLogger(LOG, client);
List<ListenableFuture<?>> futures = new ArrayList<>();
for (AbfsLease lease : leaseRefs.keySet()) {
if (lease == null) {
continue;
}
ListenableFuture<?> future = client.submit(() -> lease.free());
futures.add(future);
}
try {
Futures.allAsList(futures).get();
} catch (InterruptedException e) {
LOG.error("Interrupted freeing leases", e);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
LOG.error("Error freeing leases", e);
} finally {
IOUtils.cleanupWithLogger(LOG, client);
}
}
byte[] encodeAttribute(String value) throws UnsupportedEncodingException {
@ -496,12 +525,14 @@ public OutputStream createFile(final Path path,
}
perfInfo.registerResult(op.getResult()).registerSuccess(true);
AbfsLease lease = maybeCreateLease(relativePath);
return new AbfsOutputStream(
client,
statistics,
relativePath,
0,
populateAbfsOutputStreamContext(isAppendBlob));
populateAbfsOutputStreamContext(isAppendBlob, lease));
}
}
@ -573,7 +604,8 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
return op;
}
private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppendBlob) {
private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppendBlob,
AbfsLease lease) {
int bufferSize = abfsConfiguration.getWriteBufferSize();
if (isAppendBlob && bufferSize > FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE) {
bufferSize = FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE;
@ -587,6 +619,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppend
.withAppendBlob(isAppendBlob)
.withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount())
.withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue())
.withLease(lease)
.build();
}
@ -702,15 +735,29 @@ public OutputStream openFileForWrite(final Path path, final FileSystem.Statistic
isAppendBlob = true;
}
AbfsLease lease = maybeCreateLease(relativePath);
return new AbfsOutputStream(
client,
statistics,
relativePath,
offset,
populateAbfsOutputStreamContext(isAppendBlob));
populateAbfsOutputStreamContext(isAppendBlob, lease));
}
}
/**
* Break any current lease on an ABFS file.
*
* @param path file name
* @throws AzureBlobFileSystemException on any exception while breaking the lease
*/
public void breakLease(final Path path) throws AzureBlobFileSystemException {
LOG.debug("lease path: {}", path);
client.breakLease(getRelativePath(path));
}
public void rename(final Path source, final Path destination) throws
AzureBlobFileSystemException {
final Instant startAggregate = abfsPerfTracker.getLatencyInstant();
@ -1344,6 +1391,13 @@ public boolean isAtomicRenameKey(String key) {
return isKeyForDirectorySet(key, azureAtomicRenameDirSet);
}
public boolean isInfiniteLeaseKey(String key) {
if (azureInfiniteLeaseDirSet.isEmpty()) {
return false;
}
return isKeyForDirectorySet(key, azureInfiniteLeaseDirSet);
}
/**
* A on-off operation to initialize AbfsClient for AzureBlobFileSystem
* Operations.
@ -1633,4 +1687,32 @@ void setNamespaceEnabled(Trilean isNamespaceEnabled){
this.isNamespaceEnabled = isNamespaceEnabled;
}
private void updateInfiniteLeaseDirs() {
this.azureInfiniteLeaseDirSet = new HashSet<>(Arrays.asList(
abfsConfiguration.getAzureInfiniteLeaseDirs().split(AbfsHttpConstants.COMMA)));
// remove the empty string, since isKeyForDirectory returns true for empty strings
// and we don't want to default to enabling infinite lease dirs
this.azureInfiniteLeaseDirSet.remove("");
}
private AbfsLease maybeCreateLease(String relativePath)
throws AzureBlobFileSystemException {
boolean enableInfiniteLease = isInfiniteLeaseKey(relativePath);
if (!enableInfiniteLease) {
return null;
}
AbfsLease lease = new AbfsLease(client, relativePath);
leaseRefs.put(lease, null);
return lease;
}
@VisibleForTesting
boolean areLeasesFreed() {
for (AbfsLease lease : leaseRefs.keySet()) {
if (lease != null && !lease.isFreed()) {
return false;
}
}
return true;
}
}

View File

@ -39,6 +39,11 @@ public final class AbfsHttpConstants {
public static final String GET_ACCESS_CONTROL = "getAccessControl";
public static final String CHECK_ACCESS = "checkAccess";
public static final String GET_STATUS = "getStatus";
public static final String ACQUIRE_LEASE_ACTION = "acquire";
public static final String BREAK_LEASE_ACTION = "break";
public static final String RELEASE_LEASE_ACTION = "release";
public static final String RENEW_LEASE_ACTION = "renew";
public static final String DEFAULT_LEASE_BREAK_PERIOD = "0";
public static final String DEFAULT_TIMEOUT = "90";
public static final String APPEND_BLOB_TYPE = "appendblob";
public static final String TOKEN_VERSION = "2";

View File

@ -86,6 +86,15 @@ public final class ConfigurationKeys {
/** Provides a config to provide comma separated path prefixes on which Appendblob based files are created
* Default is empty. **/
public static final String FS_AZURE_APPEND_BLOB_KEY = "fs.azure.appendblob.directories";
/** Provides a config to provide comma separated path prefixes which support infinite leases.
* Files under these paths will be leased when created or opened for writing and the lease will
* be released when the file is closed. The lease may be broken with the breakLease method on
* AzureBlobFileSystem. Default is empty.
* **/
public static final String FS_AZURE_INFINITE_LEASE_KEY = "fs.azure.infinite-lease.directories";
/** Provides a number of threads to use for lease operations for infinite lease directories.
* Must be set to a minimum of 1 if infinite lease directories are to be used. Default is 0. **/
public static final String FS_AZURE_LEASE_THREADS = "fs.azure.lease.threads";
public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth";
public static final String FS_AZURE_ALWAYS_READ_BUFFER_SIZE = "fs.azure.read.alwaysReadBufferSize";
public static final String FS_AZURE_READ_AHEAD_BLOCK_SIZE = "fs.azure.read.readahead.blocksize";

View File

@ -77,6 +77,13 @@ public final class FileSystemConfigurations {
public static final String DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES = "/hbase";
public static final boolean DEFAULT_FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE = true;
public static final String DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES = "";
public static final String DEFAULT_FS_AZURE_INFINITE_LEASE_DIRECTORIES = "";
public static final int DEFAULT_LEASE_THREADS = 0;
public static final int MIN_LEASE_THREADS = 0;
public static final int DEFAULT_LEASE_DURATION = -1;
public static final int INFINITE_LEASE_DURATION = -1;
public static final int MIN_LEASE_DURATION = 15;
public static final int MAX_LEASE_DURATION = 60;
public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;

View File

@ -59,6 +59,11 @@ public final class HttpHeaderConfigurations {
public static final String X_MS_UMASK = "x-ms-umask";
public static final String X_MS_NAMESPACE_ENABLED = "x-ms-namespace-enabled";
public static final String X_MS_ABFS_CLIENT_LATENCY = "x-ms-abfs-client-latency";
public static final String X_MS_LEASE_ACTION = "x-ms-lease-action";
public static final String X_MS_LEASE_DURATION = "x-ms-lease-duration";
public static final String X_MS_LEASE_ID = "x-ms-lease-id";
public static final String X_MS_PROPOSED_LEASE_ID = "x-ms-proposed-lease-id";
public static final String X_MS_LEASE_BREAK_PERIOD = "x-ms-lease-break-period";
private HttpHeaderConfigurations() {}
}

View File

@ -46,6 +46,22 @@ public class ConfigurationValidationAnnotations {
boolean ThrowIfInvalid() default false;
}
@Target({ ElementType.FIELD })
@Retention(RetentionPolicy.RUNTIME)
public @interface IntegerWithOutlierConfigurationValidatorAnnotation {
String ConfigurationKey();
int MaxValue() default Integer.MAX_VALUE;
int MinValue() default Integer.MIN_VALUE;
int OutlierValue() default Integer.MIN_VALUE;
int DefaultValue();
boolean ThrowIfInvalid() default false;
}
/**
* Describes the requirements when validating the annotated long field.
*/

View File

@ -37,6 +37,10 @@ public AzureBlobFileSystemException(final String message, final Exception innerE
super(message, innerException);
}
public AzureBlobFileSystemException(final String message, final Throwable innerThrowable) {
super(message, innerThrowable);
}
@Override
public String toString() {
if (this.getMessage() == null && this.getCause() == null) {

View File

@ -33,17 +33,20 @@ public enum Mode {
private final int length;
private final Mode mode;
private final boolean isAppendBlob;
private final String leaseId;
public AppendRequestParameters(final long position,
final int offset,
final int length,
final Mode mode,
final boolean isAppendBlob) {
final boolean isAppendBlob,
final String leaseId) {
this.position = position;
this.offset = offset;
this.length = length;
this.mode = mode;
this.isAppendBlob = isAppendBlob;
this.leaseId = leaseId;
}
public long getPosition() {
@ -66,4 +69,7 @@ public boolean isAppendBlob() {
return this.isAppendBlob;
}
public String getLeaseId() {
return this.leaseId;
}
}

View File

@ -31,11 +31,18 @@
public class IntegerConfigurationBasicValidator extends ConfigurationBasicValidator<Integer> implements ConfigurationValidator {
private final int min;
private final int max;
private final int outlier;
public IntegerConfigurationBasicValidator(final int min, final int max, final int defaultVal, final String configKey, final boolean throwIfInvalid) {
this(min, min, max, defaultVal, configKey, throwIfInvalid);
}
public IntegerConfigurationBasicValidator(final int outlier, final int min, final int max,
final int defaultVal, final String configKey, final boolean throwIfInvalid) {
super(configKey, defaultVal, throwIfInvalid);
this.min = min;
this.max = max;
this.outlier = outlier;
}
public Integer validate(final String configValue) throws InvalidConfigurationValueException {
@ -47,10 +54,14 @@ public Integer validate(final String configValue) throws InvalidConfigurationVal
try {
result = Integer.parseInt(configValue);
// throw an exception if a 'within bounds' value is missing
if (getThrowIfInvalid() && (result < this.min || result > this.max)) {
if (getThrowIfInvalid() && (result != outlier) && (result < this.min || result > this.max)) {
throw new InvalidConfigurationValueException(getConfigKey());
}
if (result == outlier) {
return result;
}
// set the value to the nearest bound if it's out of bounds
if (result < this.min) {
return this.min;

View File

@ -29,16 +29,27 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.FutureCallback;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableScheduledFuture;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningScheduledExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderException;
@ -49,6 +60,8 @@
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT;
@ -76,6 +89,8 @@ public class AbfsClient implements Closeable {
private SASTokenProvider sasTokenProvider;
private final AbfsCounters abfsCounters;
private final ListeningScheduledExecutorService executorService;
private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final AbfsClientContext abfsClientContext) {
@ -106,6 +121,11 @@ private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCreden
this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName);
this.abfsPerfTracker = abfsClientContext.getAbfsPerfTracker();
this.abfsCounters = abfsClientContext.getAbfsCounters();
ThreadFactory tf =
new ThreadFactoryBuilder().setNameFormat("AbfsClient Lease Ops").setDaemon(true).build();
this.executorService = MoreExecutors.listeningDecorator(
HadoopExecutors.newScheduledThreadPool(this.abfsConfiguration.getNumLeaseThreads(), tf));
}
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
@ -129,6 +149,7 @@ public void close() throws IOException {
if (tokenProvider instanceof Closeable) {
IOUtils.cleanupWithLogger(LOG, (Closeable) tokenProvider);
}
HadoopExecutors.shutdown(executorService, LOG, 0, TimeUnit.SECONDS);
}
public String getFileSystem() {
@ -306,6 +327,83 @@ public AbfsRestOperation createPath(final String path, final boolean isFile, fin
return op;
}
public AbfsRestOperation acquireLease(final String path, int duration) throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, ACQUIRE_LEASE_ACTION));
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_DURATION, Integer.toString(duration)));
requestHeaders.add(new AbfsHttpHeader(X_MS_PROPOSED_LEASE_ID, UUID.randomUUID().toString()));
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.LeasePath,
this,
HTTP_METHOD_POST,
url,
requestHeaders);
op.execute();
return op;
}
public AbfsRestOperation renewLease(final String path, final String leaseId) throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, RENEW_LEASE_ACTION));
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId));
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.LeasePath,
this,
HTTP_METHOD_POST,
url,
requestHeaders);
op.execute();
return op;
}
public AbfsRestOperation releaseLease(final String path, final String leaseId) throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, RELEASE_LEASE_ACTION));
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId));
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.LeasePath,
this,
HTTP_METHOD_POST,
url,
requestHeaders);
op.execute();
return op;
}
public AbfsRestOperation breakLease(final String path) throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, BREAK_LEASE_ACTION));
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_BREAK_PERIOD, DEFAULT_LEASE_BREAK_PERIOD));
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.LeasePath,
this,
HTTP_METHOD_POST,
url,
requestHeaders);
op.execute();
return op;
}
public AbfsRestOperation renamePath(String source, final String destination, final String continuation)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
@ -405,6 +503,9 @@ public AbfsRestOperation append(final String path, final byte[] buffer,
// PUT and specify the real method in the X-Http-Method-Override header.
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
HTTP_METHOD_PATCH));
if (reqParams.getLeaseId() != null) {
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, reqParams.getLeaseId()));
}
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
@ -481,13 +582,16 @@ public boolean appendSuccessCheckOp(AbfsRestOperation op, final String path,
}
public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData,
boolean isClose, final String cachedSasToken)
boolean isClose, final String cachedSasToken, final String leaseId)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
// JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header.
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
HTTP_METHOD_PATCH));
if (leaseId != null) {
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId));
}
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, FLUSH_ACTION);
@ -992,4 +1096,21 @@ public SASTokenProvider getSasTokenProvider() {
protected AbfsCounters getAbfsCounters() {
return abfsCounters;
}
public int getNumLeaseThreads() {
return abfsConfiguration.getNumLeaseThreads();
}
public <V> ListenableScheduledFuture<V> schedule(Callable<V> callable, long delay,
TimeUnit timeUnit) {
return executorService.schedule(callable, delay, timeUnit);
}
public ListenableFuture<?> submit(Runnable runnable) {
return executorService.submit(runnable);
}
public <V> void addCallback(ListenableFuture<V> future, FutureCallback<V> callback) {
Futures.addCallback(future, callback, executorService);
}
}

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_THREADS;
/**
* ABFS error constants.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public final class AbfsErrors {
public static final String ERR_WRITE_WITHOUT_LEASE = "Attempted to write to file without lease";
public static final String ERR_LEASE_EXPIRED = "A lease ID was specified, but the lease for the"
+ " resource has expired";
public static final String ERR_NO_LEASE_ID_SPECIFIED = "There is currently a lease on the "
+ "resource and no lease ID was specified in the request";
public static final String ERR_PARALLEL_ACCESS_DETECTED = "Parallel access to the create path "
+ "detected. Failing request to honor single writer semantics";
public static final String ERR_ACQUIRING_LEASE = "Unable to acquire lease";
public static final String ERR_LEASE_ALREADY_PRESENT = "There is already a lease present";
public static final String ERR_LEASE_NOT_PRESENT = "There is currently no lease on the resource";
public static final String ERR_LEASE_ID_NOT_PRESENT = "The lease ID is not present with the "
+ "specified lease operation";
public static final String ERR_LEASE_DID_NOT_MATCH = "The lease ID specified did not match the "
+ "lease ID for the resource with the specified lease operation";
public static final String ERR_LEASE_BROKEN = "The lease ID matched, but the lease has been "
+ "broken explicitly and cannot be renewed";
public static final String ERR_LEASE_FUTURE_EXISTS = "There is already an existing lease "
+ "operation";
public static final String ERR_NO_LEASE_THREADS = "Lease desired but no lease threads "
+ "configured, set " + FS_AZURE_LEASE_THREADS;
private AbfsErrors() {}
}

View File

@ -0,0 +1,188 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.FutureCallback;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableScheduledFuture;
import org.apache.hadoop.thirdparty.org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.INFINITE_LEASE_DURATION;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_ACQUIRING_LEASE;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_LEASE_FUTURE_EXISTS;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_NO_LEASE_THREADS;
/**
* AbfsLease manages an Azure blob lease. It acquires an infinite lease on instantiation and
* releases the lease when free() is called. Use it to prevent writes to the blob by other
* processes that don't have the lease.
*
* Creating a new Lease object blocks the caller until the Azure blob lease is acquired. It will
* retry a fixed number of times before failing if there is a problem acquiring the lease.
*
* Call free() to release the Lease. If the holder process dies, AzureBlobFileSystem breakLease
* will need to be called before another client will be able to write to the file.
*/
public final class AbfsLease {
private static final Logger LOG = LoggerFactory.getLogger(AbfsLease.class);
// Number of retries for acquiring lease
static final int DEFAULT_LEASE_ACQUIRE_MAX_RETRIES = 7;
// Retry interval for acquiring lease in secs
static final int DEFAULT_LEASE_ACQUIRE_RETRY_INTERVAL = 10;
private final AbfsClient client;
private final String path;
// Lease status variables
private volatile boolean leaseFreed;
private volatile String leaseID = null;
private volatile Throwable exception = null;
private volatile int acquireRetryCount = 0;
private volatile ListenableScheduledFuture<AbfsRestOperation> future = null;
public static class LeaseException extends AzureBlobFileSystemException {
public LeaseException(Throwable t) {
super(ERR_ACQUIRING_LEASE + ": " + t, t);
}
public LeaseException(String s) {
super(s);
}
}
public AbfsLease(AbfsClient client, String path) throws AzureBlobFileSystemException {
this(client, path, DEFAULT_LEASE_ACQUIRE_MAX_RETRIES, DEFAULT_LEASE_ACQUIRE_RETRY_INTERVAL);
}
@VisibleForTesting
public AbfsLease(AbfsClient client, String path, int acquireMaxRetries,
int acquireRetryInterval) throws AzureBlobFileSystemException {
this.leaseFreed = false;
this.client = client;
this.path = path;
if (client.getNumLeaseThreads() < 1) {
throw new LeaseException(ERR_NO_LEASE_THREADS);
}
// Try to get the lease a specified number of times, else throw an error
RetryPolicy retryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
acquireMaxRetries, acquireRetryInterval, TimeUnit.SECONDS);
acquireLease(retryPolicy, 0, acquireRetryInterval, 0);
while (leaseID == null && exception == null) {
try {
future.get();
} catch (Exception e) {
LOG.debug("Got exception waiting for acquire lease future. Checking if lease ID or "
+ "exception have been set", e);
}
}
if (exception != null) {
LOG.error("Failed to acquire lease on {}", path);
throw new LeaseException(exception);
}
LOG.debug("Acquired lease {} on {}", leaseID, path);
}
private void acquireLease(RetryPolicy retryPolicy, int numRetries, int retryInterval, long delay)
throws LeaseException {
LOG.debug("Attempting to acquire lease on {}, retry {}", path, numRetries);
if (future != null && !future.isDone()) {
throw new LeaseException(ERR_LEASE_FUTURE_EXISTS);
}
future = client.schedule(() -> client.acquireLease(path, INFINITE_LEASE_DURATION),
delay, TimeUnit.SECONDS);
client.addCallback(future, new FutureCallback<AbfsRestOperation>() {
@Override
public void onSuccess(@Nullable AbfsRestOperation op) {
leaseID = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_LEASE_ID);
LOG.debug("Acquired lease {} on {}", leaseID, path);
}
@Override
public void onFailure(Throwable throwable) {
try {
if (RetryPolicy.RetryAction.RetryDecision.RETRY
== retryPolicy.shouldRetry(null, numRetries, 0, true).action) {
LOG.debug("Failed to acquire lease on {}, retrying: {}", path, throwable);
acquireRetryCount++;
acquireLease(retryPolicy, numRetries + 1, retryInterval, retryInterval);
} else {
exception = throwable;
}
} catch (Exception e) {
exception = throwable;
}
}
});
}
/**
* Cancel future and free the lease. If an exception occurs while releasing the lease, the error
* will be logged. If the lease cannot be released, AzureBlobFileSystem breakLease will need to
* be called before another client will be able to write to the file.
*/
public void free() {
if (leaseFreed) {
return;
}
try {
LOG.debug("Freeing lease: path {}, lease id {}", path, leaseID);
if (future != null && !future.isDone()) {
future.cancel(true);
}
client.releaseLease(path, leaseID);
} catch (IOException e) {
LOG.warn("Exception when trying to release lease {} on {}. Lease will need to be broken: {}",
leaseID, path, e.getMessage());
} finally {
// Even if releasing the lease fails (e.g. because the file was deleted),
// make sure to record that we freed the lease
leaseFreed = true;
LOG.debug("Freed lease {} on {}", leaseID, path);
}
}
public boolean isFreed() {
return leaseFreed;
}
public String getLeaseID() {
return leaseID;
}
@VisibleForTesting
public int getAcquireRetryCount() {
return acquireRetryCount;
}
}

View File

@ -37,6 +37,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
@ -53,6 +54,7 @@
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_WRITE_WITHOUT_LEASE;
import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable;
import static org.apache.hadoop.io.IOUtils.wrapException;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE;
@ -92,6 +94,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
// SAS tokens can be re-used until they expire
private CachedSASToken cachedSasToken;
private AbfsLease lease;
private String leaseId;
/**
* Queue storing buffers with the size of the Azure block ready for
* reuse. The pool allows reusing the blocks instead of allocating new
@ -142,6 +147,10 @@ public AbfsOutputStream(
}
this.maxRequestsThatCanBeQueued = abfsOutputStreamContext
.getMaxWriteRequestsToQueue();
this.lease = abfsOutputStreamContext.getLease();
this.leaseId = abfsOutputStreamContext.getLeaseId();
this.threadExecutor
= new ThreadPoolExecutor(maxConcurrentRequestCount,
maxConcurrentRequestCount,
@ -203,6 +212,10 @@ public synchronized void write(final byte[] data, final int off, final int lengt
throw new IndexOutOfBoundsException();
}
if (hasLease() && isLeaseFreed()) {
throw new PathIOException(path, ERR_WRITE_WITHOUT_LEASE);
}
int currentOffset = off;
int writableBytes = bufferSize - bufferIndex;
int numberOfBytesToWrite = length;
@ -306,6 +319,10 @@ public synchronized void close() throws IOException {
// See HADOOP-16785
throw wrapException(path, e.getMessage(), e);
} finally {
if (hasLease()) {
lease.free();
lease = null;
}
lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
buffer = null;
bufferIndex = 0;
@ -372,7 +389,7 @@ private void writeAppendBlobCurrentBufferToService() throws IOException {
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
"writeCurrentBufferToService", "append")) {
AppendRequestParameters reqParams = new AppendRequestParameters(offset, 0,
bytesLength, APPEND_MODE, true);
bytesLength, APPEND_MODE, true, leaseId);
AbfsRestOperation op = client.append(path, bytes, reqParams, cachedSasToken.get());
cachedSasToken.update(op.getSasToken());
if (outputStreamStatistics != null) {
@ -448,7 +465,7 @@ private synchronized void writeCurrentBufferToService(boolean isFlush, boolean i
mode = FLUSH_MODE;
}
AppendRequestParameters reqParams = new AppendRequestParameters(
offset, 0, bytesLength, mode, false);
offset, 0, bytesLength, mode, false, leaseId);
AbfsRestOperation op = client.append(path, bytes, reqParams,
cachedSasToken.get());
cachedSasToken.update(op.getSasToken());
@ -517,7 +534,8 @@ private synchronized void flushWrittenBytesToServiceInternal(final long offset,
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
"flushWrittenBytesToServiceInternal", "flush")) {
AbfsRestOperation op = client.flush(path, offset, retainUncommitedData, isClose, cachedSasToken.get());
AbfsRestOperation op = client.flush(path, offset, retainUncommitedData, isClose,
cachedSasToken.get(), leaseId);
cachedSasToken.update(op.getSasToken());
perfInfo.registerResult(op.getResult()).registerSuccess(true);
} catch (AzureBlobFileSystemException ex) {
@ -637,6 +655,19 @@ public IOStatistics getIOStatistics() {
return ioStatistics;
}
@VisibleForTesting
public boolean isLeaseFreed() {
if (lease == null) {
return true;
}
return lease.isFreed();
}
@VisibleForTesting
public boolean hasLease() {
return lease != null;
}
/**
* Appending AbfsOutputStream statistics to base toString().
*

View File

@ -39,6 +39,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
private int maxWriteRequestsToQueue;
private AbfsLease lease;
public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
super(sasTokenRenewPeriodForStreamsInSeconds);
}
@ -94,6 +96,11 @@ public AbfsOutputStreamContext withMaxWriteRequestsToQueue(
return this;
}
public AbfsOutputStreamContext withLease(final AbfsLease lease) {
this.lease = lease;
return this;
}
public int getWriteBufferSize() {
return writeBufferSize;
}
@ -125,4 +132,15 @@ public int getMaxWriteRequestsToQueue() {
public boolean isEnableSmallWriteOptimization() {
return this.enableSmallWriteOptimization;
}
public AbfsLease getLease() {
return this.lease;
}
public String getLeaseId() {
if (this.lease == null) {
return null;
}
return this.lease.getLeaseID();
}
}

View File

@ -131,6 +131,7 @@ String getSasToken() {
this.url = url;
this.requestHeaders = requestHeaders;
this.hasRequestBody = (AbfsHttpConstants.HTTP_METHOD_PUT.equals(method)
|| AbfsHttpConstants.HTTP_METHOD_POST.equals(method)
|| AbfsHttpConstants.HTTP_METHOD_PATCH.equals(method));
this.sasToken = sasToken;
this.abfsCounters = client.getAbfsCounters();

View File

@ -40,5 +40,6 @@ public enum AbfsRestOperationType {
Flush,
ReadFile,
DeletePath,
CheckAccess
CheckAccess,
LeasePath,
}

View File

@ -887,6 +887,22 @@ enabled for your Azure Storage account."
The directories can be specified as comma separated values. By default the value
is "/hbase"
### <a name="infiniteleaseoptions"></a> Infinite Lease Options
`fs.azure.infinite-lease.directories`: Directories for infinite lease support
can be specified comma separated in this config. By default, multiple
clients will be able to write to the same file simultaneously. When writing
to files contained within the directories specified in this config, the
client will obtain a lease on the file that will prevent any other clients
from writing to the file. When the output stream is closed, the lease will be
released. To revoke a client's write access for a file, the
AzureBlobFilesystem breakLease method may be called. If the client dies
before the file can be closed and the lease released, breakLease will need to
be called before another client will be able to write to the file.
`fs.azure.lease.threads`: This is the size of the thread pool that will be
used for lease operations for infinite lease directories. By default the value
is 0, so it must be set to at least 1 to support infinite lease directories.
### <a name="perfoptions"></a> Perf Options
#### <a name="abfstracklatencyoptions"></a> 1. HTTP Request Tracking Options

View File

@ -0,0 +1,336 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs;
import java.io.IOException;
import java.util.concurrent.RejectedExecutionException;
import org.junit.Assert;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsLease;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_INFINITE_LEASE_KEY;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_THREADS;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_ACQUIRING_LEASE;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_LEASE_EXPIRED;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_LEASE_NOT_PRESENT;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_NO_LEASE_ID_SPECIFIED;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_NO_LEASE_THREADS;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_PARALLEL_ACCESS_DETECTED;
/**
* Test lease operations.
*/
public class ITestAzureBlobFileSystemLease extends AbstractAbfsIntegrationTest {
private static final int TEST_EXECUTION_TIMEOUT = 30 * 1000;
private static final int LONG_TEST_EXECUTION_TIMEOUT = 90 * 1000;
private static final String TEST_FILE = "testfile";
private final boolean isHNSEnabled;
public ITestAzureBlobFileSystemLease() throws Exception {
super();
this.isHNSEnabled = getConfiguration()
.getBoolean(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT, false);
}
private AzureBlobFileSystem getCustomFileSystem(Path infiniteLeaseDirs, int numLeaseThreads) throws Exception {
Configuration conf = getRawConfiguration();
conf.setBoolean(String.format("fs.%s.impl.disable.cache", getAbfsScheme()), true);
conf.set(FS_AZURE_INFINITE_LEASE_KEY, infiniteLeaseDirs.toUri().getPath());
conf.setInt(FS_AZURE_LEASE_THREADS, numLeaseThreads);
return getFileSystem(conf);
}
@Test(timeout = TEST_EXECUTION_TIMEOUT)
public void testNoInfiniteLease() throws IOException {
final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
final AzureBlobFileSystem fs = getFileSystem();
fs.mkdirs(testFilePath.getParent());
try (FSDataOutputStream out = fs.create(testFilePath)) {
Assert.assertFalse("Output stream should not have lease",
((AbfsOutputStream) out.getWrappedStream()).hasLease());
}
Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
}
@Test(timeout = TEST_EXECUTION_TIMEOUT)
public void testNoLeaseThreads() throws Exception {
final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 0);
fs.mkdirs(testFilePath.getParent());
LambdaTestUtils.intercept(IOException.class, ERR_NO_LEASE_THREADS, () -> {
try (FSDataOutputStream out = fs.create(testFilePath)) {
}
return "No failure when lease requested with 0 lease threads";
});
}
@Test(timeout = TEST_EXECUTION_TIMEOUT)
public void testOneWriter() throws Exception {
final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
fs.mkdirs(testFilePath.getParent());
FSDataOutputStream out = fs.create(testFilePath);
Assert.assertTrue("Output stream should have lease",
((AbfsOutputStream) out.getWrappedStream()).hasLease());
out.close();
Assert.assertFalse("Output stream should not have lease",
((AbfsOutputStream) out.getWrappedStream()).hasLease());
Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
}
@Test(timeout = TEST_EXECUTION_TIMEOUT)
public void testSubDir() throws Exception {
final Path testFilePath = new Path(new Path(path(methodName.getMethodName()), "subdir"),
TEST_FILE);
final AzureBlobFileSystem fs =
getCustomFileSystem(testFilePath.getParent().getParent(), 1);
fs.mkdirs(testFilePath.getParent().getParent());
FSDataOutputStream out = fs.create(testFilePath);
Assert.assertTrue("Output stream should have lease",
((AbfsOutputStream) out.getWrappedStream()).hasLease());
out.close();
Assert.assertFalse("Output stream should not have lease",
((AbfsOutputStream) out.getWrappedStream()).hasLease());
Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
}
@Test(timeout = TEST_EXECUTION_TIMEOUT)
public void testTwoCreate() throws Exception {
final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
fs.mkdirs(testFilePath.getParent());
try (FSDataOutputStream out = fs.create(testFilePath)) {
LambdaTestUtils.intercept(IOException.class, isHNSEnabled ? ERR_PARALLEL_ACCESS_DETECTED
: ERR_NO_LEASE_ID_SPECIFIED, () -> {
try (FSDataOutputStream out2 = fs.create(testFilePath)) {
}
return "Expected second create on infinite lease dir to fail";
});
}
Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
}
private void twoWriters(AzureBlobFileSystem fs, Path testFilePath, boolean expectException) throws Exception {
try (FSDataOutputStream out = fs.create(testFilePath)) {
try (FSDataOutputStream out2 = fs.append(testFilePath)) {
out2.writeInt(2);
out2.hsync();
} catch (IOException e) {
if (expectException) {
GenericTestUtils.assertExceptionContains(ERR_ACQUIRING_LEASE, e);
} else {
throw e;
}
}
out.writeInt(1);
out.hsync();
}
Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
}
@Test(timeout = TEST_EXECUTION_TIMEOUT)
public void testTwoWritersCreateAppendNoInfiniteLease() throws Exception {
final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
final AzureBlobFileSystem fs = getFileSystem();
fs.mkdirs(testFilePath.getParent());
twoWriters(fs, testFilePath, false);
}
@Test(timeout = LONG_TEST_EXECUTION_TIMEOUT)
public void testTwoWritersCreateAppendWithInfiniteLeaseEnabled() throws Exception {
final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
fs.mkdirs(testFilePath.getParent());
twoWriters(fs, testFilePath, true);
}
@Test(timeout = TEST_EXECUTION_TIMEOUT)
public void testLeaseFreedOnClose() throws Exception {
final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
fs.mkdirs(testFilePath.getParent());
FSDataOutputStream out;
out = fs.create(testFilePath);
out.write(0);
Assert.assertTrue("Output stream should have lease",
((AbfsOutputStream) out.getWrappedStream()).hasLease());
out.close();
Assert.assertFalse("Output stream should not have lease after close",
((AbfsOutputStream) out.getWrappedStream()).hasLease());
Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
}
@Test(timeout = TEST_EXECUTION_TIMEOUT)
public void testWriteAfterBreakLease() throws Exception {
final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
fs.mkdirs(testFilePath.getParent());
FSDataOutputStream out;
out = fs.create(testFilePath);
out.write(0);
out.hsync();
fs.breakLease(testFilePath);
LambdaTestUtils.intercept(IOException.class, ERR_LEASE_EXPIRED, () -> {
out.write(1);
out.hsync();
return "Expected exception on write after lease break but got " + out;
});
LambdaTestUtils.intercept(IOException.class, ERR_LEASE_EXPIRED, () -> {
out.close();
return "Expected exception on close after lease break but got " + out;
});
Assert.assertTrue("Output stream lease should be freed",
((AbfsOutputStream) out.getWrappedStream()).isLeaseFreed());
try (FSDataOutputStream out2 = fs.append(testFilePath)) {
out2.write(2);
out2.hsync();
}
Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
}
@Test(timeout = LONG_TEST_EXECUTION_TIMEOUT)
public void testLeaseFreedAfterBreak() throws Exception {
final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
fs.mkdirs(testFilePath.getParent());
FSDataOutputStream out = fs.create(testFilePath);
out.write(0);
fs.breakLease(testFilePath);
LambdaTestUtils.intercept(IOException.class, ERR_LEASE_EXPIRED, () -> {
out.close();
return "Expected exception on close after lease break but got " + out;
});
Assert.assertTrue("Output stream lease should be freed",
((AbfsOutputStream) out.getWrappedStream()).isLeaseFreed());
Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
}
@Test(timeout = TEST_EXECUTION_TIMEOUT)
public void testInfiniteLease() throws Exception {
final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
fs.mkdirs(testFilePath.getParent());
try (FSDataOutputStream out = fs.create(testFilePath)) {
Assert.assertTrue("Output stream should have lease",
((AbfsOutputStream) out.getWrappedStream()).hasLease());
out.write(0);
}
Assert.assertTrue(fs.getAbfsStore().areLeasesFreed());
try (FSDataOutputStream out = fs.append(testFilePath)) {
Assert.assertTrue("Output stream should have lease",
((AbfsOutputStream) out.getWrappedStream()).hasLease());
out.write(1);
}
Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
}
@Test(timeout = TEST_EXECUTION_TIMEOUT)
public void testFileSystemClose() throws Exception {
final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
fs.mkdirs(testFilePath.getParent());
FSDataOutputStream out = fs.create(testFilePath);
out.write(0);
Assert.assertFalse("Store leases should exist", fs.getAbfsStore().areLeasesFreed());
fs.close();
Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
LambdaTestUtils.intercept(IOException.class, isHNSEnabled ? ERR_LEASE_NOT_PRESENT
: ERR_LEASE_EXPIRED, () -> {
out.close();
return "Expected exception on close after closed FS but got " + out;
});
LambdaTestUtils.intercept(RejectedExecutionException.class, () -> {
try (FSDataOutputStream out2 = fs.append(testFilePath)) {
}
return "Expected exception on new append after closed FS";
});
}
@Test(timeout = TEST_EXECUTION_TIMEOUT)
public void testAcquireRetry() throws Exception {
final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
fs.mkdirs(testFilePath.getParent());
fs.createNewFile(testFilePath);
AbfsLease lease = new AbfsLease(fs.getAbfsClient(), testFilePath.toUri().getPath());
Assert.assertNotNull("Did not successfully lease file", lease.getLeaseID());
lease.free();
Assert.assertEquals("Unexpected acquire retry count", 0, lease.getAcquireRetryCount());
AbfsClient mockClient = spy(fs.getAbfsClient());
doThrow(new AbfsLease.LeaseException("failed to acquire 1"))
.doThrow(new AbfsLease.LeaseException("failed to acquire 2"))
.doCallRealMethod()
.when(mockClient).acquireLease(anyString(), anyInt());
lease = new AbfsLease(mockClient, testFilePath.toUri().getPath(), 5, 1);
Assert.assertNotNull("Acquire lease should have retried", lease.getLeaseID());
lease.free();
Assert.assertEquals("Unexpected acquire retry count", 2, lease.getAcquireRetryCount());
doThrow(new AbfsLease.LeaseException("failed to acquire"))
.when(mockClient).acquireLease(anyString(), anyInt());
LambdaTestUtils.intercept(AzureBlobFileSystemException.class, () -> {
new AbfsLease(mockClient, testFilePath.toUri().getPath(), 5, 1);
});
}
}

View File

@ -24,11 +24,14 @@
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
import org.apache.hadoop.fs.azurebfs.utils.Base64;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_LEASE_DURATION;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.INFINITE_LEASE_DURATION;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_LEASE_DURATION;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_LEASE_DURATION;
/**
* Test configuration validators.
@ -58,6 +61,26 @@ public void testIntegerConfigValidatorThrowsIfMissingValidValue() throws Excepti
integerConfigurationValidator.validate("3072");
}
@Test
public void testIntegerWithOutlierConfigValidator() throws Exception {
IntegerConfigurationBasicValidator integerConfigurationValidator = new IntegerConfigurationBasicValidator(
INFINITE_LEASE_DURATION, MIN_LEASE_DURATION, MAX_LEASE_DURATION, DEFAULT_LEASE_DURATION, FAKE_KEY,
false);
assertEquals(INFINITE_LEASE_DURATION, (int) integerConfigurationValidator.validate("-1"));
assertEquals(DEFAULT_LEASE_DURATION, (int) integerConfigurationValidator.validate(null));
assertEquals(MIN_LEASE_DURATION, (int) integerConfigurationValidator.validate("15"));
assertEquals(MAX_LEASE_DURATION, (int) integerConfigurationValidator.validate("60"));
}
@Test(expected = InvalidConfigurationValueException.class)
public void testIntegerWithOutlierConfigValidatorThrowsIfMissingValidValue() throws Exception {
IntegerConfigurationBasicValidator integerConfigurationValidator = new IntegerConfigurationBasicValidator(
INFINITE_LEASE_DURATION, MIN_LEASE_DURATION, MAX_LEASE_DURATION, DEFAULT_LEASE_DURATION, FAKE_KEY,
true);
integerConfigurationValidator.validate("14");
}
@Test
public void testLongConfigValidator() throws Exception {
LongConfigurationBasicValidator longConfigurationValidator = new LongConfigurationBasicValidator(

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.conf.Configuration;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.ArgumentMatchers.refEq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -86,7 +87,7 @@ public void verifyShortWriteRequest() throws Exception {
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op);
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
@ -104,9 +105,9 @@ public void verifyShortWriteRequest() throws Exception {
out.hsync();
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
0, 0, WRITE_SIZE, APPEND_MODE, false);
0, 0, WRITE_SIZE, APPEND_MODE, false, null);
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
WRITE_SIZE, 0, 2 * WRITE_SIZE, APPEND_MODE, false);
WRITE_SIZE, 0, 2 * WRITE_SIZE, APPEND_MODE, false, null);
verify(client, times(1)).append(
eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
@ -133,7 +134,7 @@ public void verifyWriteRequest() throws Exception {
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op);
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
@ -146,9 +147,9 @@ public void verifyWriteRequest() throws Exception {
out.close();
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
0, 0, BUFFER_SIZE, APPEND_MODE, false);
0, 0, BUFFER_SIZE, APPEND_MODE, false, null);
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
BUFFER_SIZE, 0, 5*WRITE_SIZE-BUFFER_SIZE, APPEND_MODE, false);
BUFFER_SIZE, 0, 5*WRITE_SIZE-BUFFER_SIZE, APPEND_MODE, false, null);
verify(client, times(1)).append(
eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
@ -165,7 +166,7 @@ public void verifyWriteRequest() throws Exception {
ArgumentCaptor<String> acFlushSASToken = ArgumentCaptor.forClass(String.class);
verify(client, times(1)).flush(acFlushPath.capture(), acFlushPosition.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
acFlushSASToken.capture());
acFlushSASToken.capture(), isNull());
assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushPath.getAllValues());
assertThat(Arrays.asList(Long.valueOf(5*WRITE_SIZE))).describedAs("position").isEqualTo(acFlushPosition.getAllValues());
assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
@ -189,7 +190,7 @@ public void verifyWriteRequestOfBufferSizeAndClose() throws Exception {
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op);
when(op.getSasToken()).thenReturn("testToken");
when(op.getResult()).thenReturn(httpOp);
@ -204,9 +205,9 @@ public void verifyWriteRequestOfBufferSizeAndClose() throws Exception {
out.close();
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
0, 0, BUFFER_SIZE, APPEND_MODE, false);
0, 0, BUFFER_SIZE, APPEND_MODE, false, null);
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false);
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null);
verify(client, times(1)).append(
eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
@ -223,7 +224,7 @@ public void verifyWriteRequestOfBufferSizeAndClose() throws Exception {
ArgumentCaptor<String> acFlushSASToken = ArgumentCaptor.forClass(String.class);
verify(client, times(1)).flush(acFlushPath.capture(), acFlushPosition.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
acFlushSASToken.capture());
acFlushSASToken.capture(), isNull());
assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushPath.getAllValues());
assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushPosition.getAllValues());
assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
@ -247,7 +248,7 @@ public void verifyWriteRequestOfBufferSize() throws Exception {
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op);
when(op.getSasToken()).thenReturn("testToken");
when(op.getResult()).thenReturn(httpOp);
@ -262,9 +263,9 @@ public void verifyWriteRequestOfBufferSize() throws Exception {
Thread.sleep(1000);
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
0, 0, BUFFER_SIZE, APPEND_MODE, false);
0, 0, BUFFER_SIZE, APPEND_MODE, false, null);
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false);
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null);
verify(client, times(1)).append(
eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
@ -291,7 +292,7 @@ public void verifyWriteRequestOfBufferSizeWithAppendBlob() throws Exception {
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op);
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, true));
@ -304,9 +305,9 @@ public void verifyWriteRequestOfBufferSizeWithAppendBlob() throws Exception {
Thread.sleep(1000);
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
0, 0, BUFFER_SIZE, APPEND_MODE, true);
0, 0, BUFFER_SIZE, APPEND_MODE, true, null);
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, true);
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, true, null);
verify(client, times(1)).append(
eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
@ -334,7 +335,7 @@ public void verifyWriteRequestOfBufferSizeAndHFlush() throws Exception {
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op);
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
@ -347,9 +348,9 @@ public void verifyWriteRequestOfBufferSizeAndHFlush() throws Exception {
out.hflush();
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
0, 0, BUFFER_SIZE, APPEND_MODE, false);
0, 0, BUFFER_SIZE, APPEND_MODE, false, null);
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false);
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null);
verify(client, times(1)).append(
eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
@ -366,7 +367,7 @@ public void verifyWriteRequestOfBufferSizeAndHFlush() throws Exception {
ArgumentCaptor<String> acFlushSASToken = ArgumentCaptor.forClass(String.class);
verify(client, times(1)).flush(acFlushPath.capture(), acFlushPosition.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
acFlushSASToken.capture());
acFlushSASToken.capture(), isNull());
assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushPath.getAllValues());
assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushPosition.getAllValues());
assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
@ -388,7 +389,7 @@ public void verifyWriteRequestOfBufferSizeAndFlush() throws Exception {
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op);
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
@ -403,9 +404,9 @@ public void verifyWriteRequestOfBufferSizeAndFlush() throws Exception {
Thread.sleep(1000);
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
0, 0, BUFFER_SIZE, APPEND_MODE, false);
0, 0, BUFFER_SIZE, APPEND_MODE, false, null);
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false);
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null);
verify(client, times(1)).append(
eq(PATH), any(byte[].class), refEq(firstReqParameters), any());