HADOOP-16948. Support infinite lease dirs. (#1925)

* HADOOP-16948. Support single writer dirs.

* HADOOP-16948. Fix findbugs and checkstyle problems.

* HADOOP-16948. Fix remaining checkstyle problems.

* HADOOP-16948. Add DurationInfo, retry policy for acquiring lease, and javadocs

* HADOOP-16948. Convert ABFS client to use an executor for lease ops

* HADOOP-16948. Fix ABFS lease test for non-HNS

* HADOOP-16948. Fix checkstyle and javadoc

* HADOOP-16948. Address review comments

* HADOOP-16948. Use daemon threads for ABFS lease ops

* HADOOP-16948. Make lease duration configurable

* HADOOP-16948. Add error messages to test assertions

* HADOOP-16948. Remove extra isSingleWriterKey call

* HADOOP-16948. Use only infinite lease duration due to cost of renewal ops

* HADOOP-16948. Remove acquire/renew/release lease methods

* HADOOP-16948. Rename single writer dirs to infinite lease dirs

* HADOOP-16948. Fix checkstyle

* HADOOP-16948. Wait for acquire lease future

* HADOOP-16948. Add unit test for acquire lease failure
This commit is contained in:
billierinaldi 2021-04-12 19:47:59 -04:00 committed by GitHub
parent cb3ed32fe0
commit c1fde4fe94
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1032 additions and 42 deletions

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.AuthConfigurations;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerWithOutlierConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.LongConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.StringConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.Base64StringConfigurationValidatorAnnotation;
@ -208,6 +209,15 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES)
private String azureAppendBlobDirs;
@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_INFINITE_LEASE_KEY,
DefaultValue = DEFAULT_FS_AZURE_INFINITE_LEASE_DIRECTORIES)
private String azureInfiniteLeaseDirs;
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_LEASE_THREADS,
MinValue = MIN_LEASE_THREADS,
DefaultValue = DEFAULT_LEASE_THREADS)
private int numLeaseThreads;
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
DefaultValue = DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION)
private boolean createRemoteFileSystemDuringInitialization;
@ -296,6 +306,8 @@ public class AbfsConfiguration{
field.setAccessible(true);
if (field.isAnnotationPresent(IntegerConfigurationValidatorAnnotation.class)) {
field.set(this, validateInt(field));
} else if (field.isAnnotationPresent(IntegerWithOutlierConfigurationValidatorAnnotation.class)) {
field.set(this, validateIntWithOutlier(field));
} else if (field.isAnnotationPresent(LongConfigurationValidatorAnnotation.class)) {
field.set(this, validateLong(field));
} else if (field.isAnnotationPresent(StringConfigurationValidatorAnnotation.class)) {
@ -634,6 +646,14 @@ public class AbfsConfiguration{
return this.azureAppendBlobDirs;
}
public String getAzureInfiniteLeaseDirs() {
return this.azureInfiniteLeaseDirs;
}
public int getNumLeaseThreads() {
return this.numLeaseThreads;
}
public boolean getCreateRemoteFileSystemDuringInitialization() {
// we do not support creating the filesystem when AuthType is SAS
return this.createRemoteFileSystemDuringInitialization
@ -843,6 +863,21 @@ public class AbfsConfiguration{
validator.ThrowIfInvalid()).validate(value);
}
int validateIntWithOutlier(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
IntegerWithOutlierConfigurationValidatorAnnotation validator =
field.getAnnotation(IntegerWithOutlierConfigurationValidatorAnnotation.class);
String value = get(validator.ConfigurationKey());
// validate
return new IntegerConfigurationBasicValidator(
validator.OutlierValue(),
validator.MinValue(),
validator.MaxValue(),
validator.DefaultValue(),
validator.ConfigurationKey(),
validator.ThrowIfInvalid()).validate(value);
}
long validateLong(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
LongConfigurationValidatorAnnotation validator = field.getAnnotation(LongConfigurationValidatorAnnotation.class);
String value = rawConfig.get(validator.ConfigurationKey());

View File

@ -87,6 +87,7 @@ import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.functional.RemoteIterators;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Progressable;
@ -505,6 +506,26 @@ public class AzureBlobFileSystem extends FileSystem {
}
}
/**
* Break the current lease on an ABFS file if it exists. A lease that is broken cannot be
* renewed. A new lease may be obtained on the file immediately.
*
* @param f file name
* @throws IOException on any exception while breaking the lease
*/
public void breakLease(final Path f) throws IOException {
LOG.debug("AzureBlobFileSystem.breakLease path: {}", f);
Path qualifiedPath = makeQualified(f);
try (DurationInfo ignored = new DurationInfo(LOG, false, "Break lease for %s",
qualifiedPath)) {
abfsStore.breakLease(qualifiedPath);
} catch(AzureBlobFileSystemException ex) {
checkException(f, ex);
}
}
/**
* Qualify a path to one which uses this FileSystem and, if relative,
* made absolute.

View File

@ -39,6 +39,7 @@ import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@ -48,10 +49,14 @@ import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -100,6 +105,7 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsPermission;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
import org.apache.hadoop.fs.azurebfs.services.AbfsLease;
import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials;
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfTracker;
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfInfo;
@ -145,8 +151,11 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
private static final String XMS_PROPERTIES_ENCODING = "ISO-8859-1";
private static final int GET_SET_AGGREGATE_COUNT = 2;
private final Map<AbfsLease, Object> leaseRefs;
private final AbfsConfiguration abfsConfiguration;
private final Set<String> azureAtomicRenameDirSet;
private Set<String> azureInfiniteLeaseDirSet;
private Trilean isNamespaceEnabled;
private final AuthType authType;
private final UserGroupInformation userGroupInformation;
@ -167,6 +176,8 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
final String fileSystemName = authorityParts[0];
final String accountName = authorityParts[1];
leaseRefs = Collections.synchronizedMap(new WeakHashMap<>());
try {
this.abfsConfiguration = new AbfsConfiguration(configuration, accountName);
} catch (IllegalAccessException exception) {
@ -195,6 +206,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
abfsConfiguration.getAzureAtomicRenameDirs().split(AbfsHttpConstants.COMMA)));
updateInfiniteLeaseDirs();
this.authType = abfsConfiguration.getAuthType(accountName);
boolean usingOauth = (authType == AuthType.OAuth);
boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : isSecureScheme;
@ -246,8 +258,25 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
@Override
public void close() throws IOException {
List<ListenableFuture<?>> futures = new ArrayList<>();
for (AbfsLease lease : leaseRefs.keySet()) {
if (lease == null) {
continue;
}
ListenableFuture<?> future = client.submit(() -> lease.free());
futures.add(future);
}
try {
Futures.allAsList(futures).get();
} catch (InterruptedException e) {
LOG.error("Interrupted freeing leases", e);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
LOG.error("Error freeing leases", e);
} finally {
IOUtils.cleanupWithLogger(LOG, client);
}
}
byte[] encodeAttribute(String value) throws UnsupportedEncodingException {
return value.getBytes(XMS_PROPERTIES_ENCODING);
@ -496,12 +525,14 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
}
perfInfo.registerResult(op.getResult()).registerSuccess(true);
AbfsLease lease = maybeCreateLease(relativePath);
return new AbfsOutputStream(
client,
statistics,
relativePath,
0,
populateAbfsOutputStreamContext(isAppendBlob));
populateAbfsOutputStreamContext(isAppendBlob, lease));
}
}
@ -573,7 +604,8 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
return op;
}
private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppendBlob) {
private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppendBlob,
AbfsLease lease) {
int bufferSize = abfsConfiguration.getWriteBufferSize();
if (isAppendBlob && bufferSize > FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE) {
bufferSize = FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE;
@ -587,6 +619,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
.withAppendBlob(isAppendBlob)
.withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount())
.withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue())
.withLease(lease)
.build();
}
@ -705,15 +738,29 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
isAppendBlob = true;
}
AbfsLease lease = maybeCreateLease(relativePath);
return new AbfsOutputStream(
client,
statistics,
relativePath,
offset,
populateAbfsOutputStreamContext(isAppendBlob));
populateAbfsOutputStreamContext(isAppendBlob, lease));
}
}
/**
* Break any current lease on an ABFS file.
*
* @param path file name
* @throws AzureBlobFileSystemException on any exception while breaking the lease
*/
public void breakLease(final Path path) throws AzureBlobFileSystemException {
LOG.debug("lease path: {}", path);
client.breakLease(getRelativePath(path));
}
public void rename(final Path source, final Path destination) throws
AzureBlobFileSystemException {
final Instant startAggregate = abfsPerfTracker.getLatencyInstant();
@ -1347,6 +1394,13 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
return isKeyForDirectorySet(key, azureAtomicRenameDirSet);
}
public boolean isInfiniteLeaseKey(String key) {
if (azureInfiniteLeaseDirSet.isEmpty()) {
return false;
}
return isKeyForDirectorySet(key, azureInfiniteLeaseDirSet);
}
/**
* A on-off operation to initialize AbfsClient for AzureBlobFileSystem
* Operations.
@ -1636,4 +1690,32 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
this.isNamespaceEnabled = isNamespaceEnabled;
}
private void updateInfiniteLeaseDirs() {
this.azureInfiniteLeaseDirSet = new HashSet<>(Arrays.asList(
abfsConfiguration.getAzureInfiniteLeaseDirs().split(AbfsHttpConstants.COMMA)));
// remove the empty string, since isKeyForDirectory returns true for empty strings
// and we don't want to default to enabling infinite lease dirs
this.azureInfiniteLeaseDirSet.remove("");
}
private AbfsLease maybeCreateLease(String relativePath)
throws AzureBlobFileSystemException {
boolean enableInfiniteLease = isInfiniteLeaseKey(relativePath);
if (!enableInfiniteLease) {
return null;
}
AbfsLease lease = new AbfsLease(client, relativePath);
leaseRefs.put(lease, null);
return lease;
}
@VisibleForTesting
boolean areLeasesFreed() {
for (AbfsLease lease : leaseRefs.keySet()) {
if (lease != null && !lease.isFreed()) {
return false;
}
}
return true;
}
}

View File

@ -39,6 +39,11 @@ public final class AbfsHttpConstants {
public static final String GET_ACCESS_CONTROL = "getAccessControl";
public static final String CHECK_ACCESS = "checkAccess";
public static final String GET_STATUS = "getStatus";
public static final String ACQUIRE_LEASE_ACTION = "acquire";
public static final String BREAK_LEASE_ACTION = "break";
public static final String RELEASE_LEASE_ACTION = "release";
public static final String RENEW_LEASE_ACTION = "renew";
public static final String DEFAULT_LEASE_BREAK_PERIOD = "0";
public static final String DEFAULT_TIMEOUT = "90";
public static final String APPEND_BLOB_TYPE = "appendblob";
public static final String TOKEN_VERSION = "2";

View File

@ -87,6 +87,15 @@ public final class ConfigurationKeys {
/** Provides a config to provide comma separated path prefixes on which Appendblob based files are created
* Default is empty. **/
public static final String FS_AZURE_APPEND_BLOB_KEY = "fs.azure.appendblob.directories";
/** Provides a config to provide comma separated path prefixes which support infinite leases.
* Files under these paths will be leased when created or opened for writing and the lease will
* be released when the file is closed. The lease may be broken with the breakLease method on
* AzureBlobFileSystem. Default is empty.
* **/
public static final String FS_AZURE_INFINITE_LEASE_KEY = "fs.azure.infinite-lease.directories";
/** Provides a number of threads to use for lease operations for infinite lease directories.
* Must be set to a minimum of 1 if infinite lease directories are to be used. Default is 0. **/
public static final String FS_AZURE_LEASE_THREADS = "fs.azure.lease.threads";
public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth";
public static final String FS_AZURE_ALWAYS_READ_BUFFER_SIZE = "fs.azure.read.alwaysReadBufferSize";
public static final String FS_AZURE_READ_AHEAD_BLOCK_SIZE = "fs.azure.read.readahead.blocksize";

View File

@ -78,6 +78,13 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE = true;
public static final boolean DEFAULT_FS_AZURE_ENABLE_MKDIR_OVERWRITE = true;
public static final String DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES = "";
public static final String DEFAULT_FS_AZURE_INFINITE_LEASE_DIRECTORIES = "";
public static final int DEFAULT_LEASE_THREADS = 0;
public static final int MIN_LEASE_THREADS = 0;
public static final int DEFAULT_LEASE_DURATION = -1;
public static final int INFINITE_LEASE_DURATION = -1;
public static final int MIN_LEASE_DURATION = 15;
public static final int MAX_LEASE_DURATION = 60;
public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;

View File

@ -60,6 +60,11 @@ public final class HttpHeaderConfigurations {
public static final String X_MS_UMASK = "x-ms-umask";
public static final String X_MS_NAMESPACE_ENABLED = "x-ms-namespace-enabled";
public static final String X_MS_ABFS_CLIENT_LATENCY = "x-ms-abfs-client-latency";
public static final String X_MS_LEASE_ACTION = "x-ms-lease-action";
public static final String X_MS_LEASE_DURATION = "x-ms-lease-duration";
public static final String X_MS_LEASE_ID = "x-ms-lease-id";
public static final String X_MS_PROPOSED_LEASE_ID = "x-ms-proposed-lease-id";
public static final String X_MS_LEASE_BREAK_PERIOD = "x-ms-lease-break-period";
private HttpHeaderConfigurations() {}
}

View File

@ -46,6 +46,22 @@ public class ConfigurationValidationAnnotations {
boolean ThrowIfInvalid() default false;
}
@Target({ ElementType.FIELD })
@Retention(RetentionPolicy.RUNTIME)
public @interface IntegerWithOutlierConfigurationValidatorAnnotation {
String ConfigurationKey();
int MaxValue() default Integer.MAX_VALUE;
int MinValue() default Integer.MIN_VALUE;
int OutlierValue() default Integer.MIN_VALUE;
int DefaultValue();
boolean ThrowIfInvalid() default false;
}
/**
* Describes the requirements when validating the annotated long field.
*/

View File

@ -37,6 +37,10 @@ public abstract class AzureBlobFileSystemException extends IOException {
super(message, innerException);
}
public AzureBlobFileSystemException(final String message, final Throwable innerThrowable) {
super(message, innerThrowable);
}
@Override
public String toString() {
if (this.getMessage() == null && this.getCause() == null) {

View File

@ -33,17 +33,20 @@ public class AppendRequestParameters {
private final int length;
private final Mode mode;
private final boolean isAppendBlob;
private final String leaseId;
public AppendRequestParameters(final long position,
final int offset,
final int length,
final Mode mode,
final boolean isAppendBlob) {
final boolean isAppendBlob,
final String leaseId) {
this.position = position;
this.offset = offset;
this.length = length;
this.mode = mode;
this.isAppendBlob = isAppendBlob;
this.leaseId = leaseId;
}
public long getPosition() {
@ -66,4 +69,7 @@ public class AppendRequestParameters {
return this.isAppendBlob;
}
public String getLeaseId() {
return this.leaseId;
}
}

View File

@ -31,11 +31,18 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationVa
public class IntegerConfigurationBasicValidator extends ConfigurationBasicValidator<Integer> implements ConfigurationValidator {
private final int min;
private final int max;
private final int outlier;
public IntegerConfigurationBasicValidator(final int min, final int max, final int defaultVal, final String configKey, final boolean throwIfInvalid) {
this(min, min, max, defaultVal, configKey, throwIfInvalid);
}
public IntegerConfigurationBasicValidator(final int outlier, final int min, final int max,
final int defaultVal, final String configKey, final boolean throwIfInvalid) {
super(configKey, defaultVal, throwIfInvalid);
this.min = min;
this.max = max;
this.outlier = outlier;
}
public Integer validate(final String configValue) throws InvalidConfigurationValueException {
@ -47,10 +54,14 @@ public class IntegerConfigurationBasicValidator extends ConfigurationBasicValida
try {
result = Integer.parseInt(configValue);
// throw an exception if a 'within bounds' value is missing
if (getThrowIfInvalid() && (result < this.min || result > this.max)) {
if (getThrowIfInvalid() && (result != outlier) && (result < this.min || result > this.max)) {
throw new InvalidConfigurationValueException(getConfigKey());
}
if (result == outlier) {
return result;
}
// set the value to the nearest bound if it's out of bounds
if (result < this.min) {
return this.min;

View File

@ -29,16 +29,27 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.FutureCallback;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableScheduledFuture;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningScheduledExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderException;
@ -49,6 +60,8 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT;
@ -76,6 +89,8 @@ public class AbfsClient implements Closeable {
private SASTokenProvider sasTokenProvider;
private final AbfsCounters abfsCounters;
private final ListeningScheduledExecutorService executorService;
private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final AbfsClientContext abfsClientContext) {
@ -106,6 +121,11 @@ public class AbfsClient implements Closeable {
this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName);
this.abfsPerfTracker = abfsClientContext.getAbfsPerfTracker();
this.abfsCounters = abfsClientContext.getAbfsCounters();
ThreadFactory tf =
new ThreadFactoryBuilder().setNameFormat("AbfsClient Lease Ops").setDaemon(true).build();
this.executorService = MoreExecutors.listeningDecorator(
HadoopExecutors.newScheduledThreadPool(this.abfsConfiguration.getNumLeaseThreads(), tf));
}
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
@ -129,6 +149,7 @@ public class AbfsClient implements Closeable {
if (tokenProvider instanceof Closeable) {
IOUtils.cleanupWithLogger(LOG, (Closeable) tokenProvider);
}
HadoopExecutors.shutdown(executorService, LOG, 0, TimeUnit.SECONDS);
}
public String getFileSystem() {
@ -317,6 +338,83 @@ public class AbfsClient implements Closeable {
return op;
}
public AbfsRestOperation acquireLease(final String path, int duration) throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, ACQUIRE_LEASE_ACTION));
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_DURATION, Integer.toString(duration)));
requestHeaders.add(new AbfsHttpHeader(X_MS_PROPOSED_LEASE_ID, UUID.randomUUID().toString()));
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.LeasePath,
this,
HTTP_METHOD_POST,
url,
requestHeaders);
op.execute();
return op;
}
public AbfsRestOperation renewLease(final String path, final String leaseId) throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, RENEW_LEASE_ACTION));
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId));
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.LeasePath,
this,
HTTP_METHOD_POST,
url,
requestHeaders);
op.execute();
return op;
}
public AbfsRestOperation releaseLease(final String path, final String leaseId) throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, RELEASE_LEASE_ACTION));
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId));
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.LeasePath,
this,
HTTP_METHOD_POST,
url,
requestHeaders);
op.execute();
return op;
}
public AbfsRestOperation breakLease(final String path) throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, BREAK_LEASE_ACTION));
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_BREAK_PERIOD, DEFAULT_LEASE_BREAK_PERIOD));
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.LeasePath,
this,
HTTP_METHOD_POST,
url,
requestHeaders);
op.execute();
return op;
}
public AbfsRestOperation renamePath(String source, final String destination, final String continuation)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
@ -416,6 +514,9 @@ public class AbfsClient implements Closeable {
// PUT and specify the real method in the X-Http-Method-Override header.
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
HTTP_METHOD_PATCH));
if (reqParams.getLeaseId() != null) {
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, reqParams.getLeaseId()));
}
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
@ -492,13 +593,16 @@ public class AbfsClient implements Closeable {
}
public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData,
boolean isClose, final String cachedSasToken)
boolean isClose, final String cachedSasToken, final String leaseId)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
// JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header.
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
HTTP_METHOD_PATCH));
if (leaseId != null) {
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId));
}
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, FLUSH_ACTION);
@ -1003,4 +1107,21 @@ public class AbfsClient implements Closeable {
protected AbfsCounters getAbfsCounters() {
return abfsCounters;
}
public int getNumLeaseThreads() {
return abfsConfiguration.getNumLeaseThreads();
}
public <V> ListenableScheduledFuture<V> schedule(Callable<V> callable, long delay,
TimeUnit timeUnit) {
return executorService.schedule(callable, delay, timeUnit);
}
public ListenableFuture<?> submit(Runnable runnable) {
return executorService.submit(runnable);
}
public <V> void addCallback(ListenableFuture<V> future, FutureCallback<V> callback) {
Futures.addCallback(future, callback, executorService);
}
}

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_THREADS;
/**
* ABFS error constants.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public final class AbfsErrors {
public static final String ERR_WRITE_WITHOUT_LEASE = "Attempted to write to file without lease";
public static final String ERR_LEASE_EXPIRED = "A lease ID was specified, but the lease for the"
+ " resource has expired";
public static final String ERR_NO_LEASE_ID_SPECIFIED = "There is currently a lease on the "
+ "resource and no lease ID was specified in the request";
public static final String ERR_PARALLEL_ACCESS_DETECTED = "Parallel access to the create path "
+ "detected. Failing request to honor single writer semantics";
public static final String ERR_ACQUIRING_LEASE = "Unable to acquire lease";
public static final String ERR_LEASE_ALREADY_PRESENT = "There is already a lease present";
public static final String ERR_LEASE_NOT_PRESENT = "There is currently no lease on the resource";
public static final String ERR_LEASE_ID_NOT_PRESENT = "The lease ID is not present with the "
+ "specified lease operation";
public static final String ERR_LEASE_DID_NOT_MATCH = "The lease ID specified did not match the "
+ "lease ID for the resource with the specified lease operation";
public static final String ERR_LEASE_BROKEN = "The lease ID matched, but the lease has been "
+ "broken explicitly and cannot be renewed";
public static final String ERR_LEASE_FUTURE_EXISTS = "There is already an existing lease "
+ "operation";
public static final String ERR_NO_LEASE_THREADS = "Lease desired but no lease threads "
+ "configured, set " + FS_AZURE_LEASE_THREADS;
private AbfsErrors() {}
}

View File

@ -0,0 +1,188 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.FutureCallback;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableScheduledFuture;
import org.apache.hadoop.thirdparty.org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.INFINITE_LEASE_DURATION;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_ACQUIRING_LEASE;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_LEASE_FUTURE_EXISTS;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_NO_LEASE_THREADS;
/**
* AbfsLease manages an Azure blob lease. It acquires an infinite lease on instantiation and
* releases the lease when free() is called. Use it to prevent writes to the blob by other
* processes that don't have the lease.
*
* Creating a new Lease object blocks the caller until the Azure blob lease is acquired. It will
* retry a fixed number of times before failing if there is a problem acquiring the lease.
*
* Call free() to release the Lease. If the holder process dies, AzureBlobFileSystem breakLease
* will need to be called before another client will be able to write to the file.
*/
public final class AbfsLease {
private static final Logger LOG = LoggerFactory.getLogger(AbfsLease.class);
// Number of retries for acquiring lease
static final int DEFAULT_LEASE_ACQUIRE_MAX_RETRIES = 7;
// Retry interval for acquiring lease in secs
static final int DEFAULT_LEASE_ACQUIRE_RETRY_INTERVAL = 10;
private final AbfsClient client;
private final String path;
// Lease status variables
private volatile boolean leaseFreed;
private volatile String leaseID = null;
private volatile Throwable exception = null;
private volatile int acquireRetryCount = 0;
private volatile ListenableScheduledFuture<AbfsRestOperation> future = null;
public static class LeaseException extends AzureBlobFileSystemException {
public LeaseException(Throwable t) {
super(ERR_ACQUIRING_LEASE + ": " + t, t);
}
public LeaseException(String s) {
super(s);
}
}
public AbfsLease(AbfsClient client, String path) throws AzureBlobFileSystemException {
this(client, path, DEFAULT_LEASE_ACQUIRE_MAX_RETRIES, DEFAULT_LEASE_ACQUIRE_RETRY_INTERVAL);
}
@VisibleForTesting
public AbfsLease(AbfsClient client, String path, int acquireMaxRetries,
int acquireRetryInterval) throws AzureBlobFileSystemException {
this.leaseFreed = false;
this.client = client;
this.path = path;
if (client.getNumLeaseThreads() < 1) {
throw new LeaseException(ERR_NO_LEASE_THREADS);
}
// Try to get the lease a specified number of times, else throw an error
RetryPolicy retryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
acquireMaxRetries, acquireRetryInterval, TimeUnit.SECONDS);
acquireLease(retryPolicy, 0, acquireRetryInterval, 0);
while (leaseID == null && exception == null) {
try {
future.get();
} catch (Exception e) {
LOG.debug("Got exception waiting for acquire lease future. Checking if lease ID or "
+ "exception have been set", e);
}
}
if (exception != null) {
LOG.error("Failed to acquire lease on {}", path);
throw new LeaseException(exception);
}
LOG.debug("Acquired lease {} on {}", leaseID, path);
}
private void acquireLease(RetryPolicy retryPolicy, int numRetries, int retryInterval, long delay)
throws LeaseException {
LOG.debug("Attempting to acquire lease on {}, retry {}", path, numRetries);
if (future != null && !future.isDone()) {
throw new LeaseException(ERR_LEASE_FUTURE_EXISTS);
}
future = client.schedule(() -> client.acquireLease(path, INFINITE_LEASE_DURATION),
delay, TimeUnit.SECONDS);
client.addCallback(future, new FutureCallback<AbfsRestOperation>() {
@Override
public void onSuccess(@Nullable AbfsRestOperation op) {
leaseID = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_LEASE_ID);
LOG.debug("Acquired lease {} on {}", leaseID, path);
}
@Override
public void onFailure(Throwable throwable) {
try {
if (RetryPolicy.RetryAction.RetryDecision.RETRY
== retryPolicy.shouldRetry(null, numRetries, 0, true).action) {
LOG.debug("Failed to acquire lease on {}, retrying: {}", path, throwable);
acquireRetryCount++;
acquireLease(retryPolicy, numRetries + 1, retryInterval, retryInterval);
} else {
exception = throwable;
}
} catch (Exception e) {
exception = throwable;
}
}
});
}
/**
* Cancel future and free the lease. If an exception occurs while releasing the lease, the error
* will be logged. If the lease cannot be released, AzureBlobFileSystem breakLease will need to
* be called before another client will be able to write to the file.
*/
public void free() {
if (leaseFreed) {
return;
}
try {
LOG.debug("Freeing lease: path {}, lease id {}", path, leaseID);
if (future != null && !future.isDone()) {
future.cancel(true);
}
client.releaseLease(path, leaseID);
} catch (IOException e) {
LOG.warn("Exception when trying to release lease {} on {}. Lease will need to be broken: {}",
leaseID, path, e.getMessage());
} finally {
// Even if releasing the lease fails (e.g. because the file was deleted),
// make sure to record that we freed the lease
leaseFreed = true;
LOG.debug("Freed lease {} on {}", leaseID, path);
}
}
public boolean isFreed() {
return leaseFreed;
}
public String getLeaseID() {
return leaseID;
}
@VisibleForTesting
public int getAcquireRetryCount() {
return acquireRetryCount;
}
}

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
@ -53,6 +54,7 @@ import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_WRITE_WITHOUT_LEASE;
import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable;
import static org.apache.hadoop.io.IOUtils.wrapException;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE;
@ -92,6 +94,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
// SAS tokens can be re-used until they expire
private CachedSASToken cachedSasToken;
private AbfsLease lease;
private String leaseId;
/**
* Queue storing buffers with the size of the Azure block ready for
* reuse. The pool allows reusing the blocks instead of allocating new
@ -142,6 +147,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
}
this.maxRequestsThatCanBeQueued = abfsOutputStreamContext
.getMaxWriteRequestsToQueue();
this.lease = abfsOutputStreamContext.getLease();
this.leaseId = abfsOutputStreamContext.getLeaseId();
this.threadExecutor
= new ThreadPoolExecutor(maxConcurrentRequestCount,
maxConcurrentRequestCount,
@ -203,6 +212,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
throw new IndexOutOfBoundsException();
}
if (hasLease() && isLeaseFreed()) {
throw new PathIOException(path, ERR_WRITE_WITHOUT_LEASE);
}
int currentOffset = off;
int writableBytes = bufferSize - bufferIndex;
int numberOfBytesToWrite = length;
@ -306,6 +319,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
// See HADOOP-16785
throw wrapException(path, e.getMessage(), e);
} finally {
if (hasLease()) {
lease.free();
lease = null;
}
lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
buffer = null;
bufferIndex = 0;
@ -372,7 +389,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
"writeCurrentBufferToService", "append")) {
AppendRequestParameters reqParams = new AppendRequestParameters(offset, 0,
bytesLength, APPEND_MODE, true);
bytesLength, APPEND_MODE, true, leaseId);
AbfsRestOperation op = client.append(path, bytes, reqParams, cachedSasToken.get());
cachedSasToken.update(op.getSasToken());
if (outputStreamStatistics != null) {
@ -448,7 +465,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
mode = FLUSH_MODE;
}
AppendRequestParameters reqParams = new AppendRequestParameters(
offset, 0, bytesLength, mode, false);
offset, 0, bytesLength, mode, false, leaseId);
AbfsRestOperation op = client.append(path, bytes, reqParams,
cachedSasToken.get());
cachedSasToken.update(op.getSasToken());
@ -517,7 +534,8 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
"flushWrittenBytesToServiceInternal", "flush")) {
AbfsRestOperation op = client.flush(path, offset, retainUncommitedData, isClose, cachedSasToken.get());
AbfsRestOperation op = client.flush(path, offset, retainUncommitedData, isClose,
cachedSasToken.get(), leaseId);
cachedSasToken.update(op.getSasToken());
perfInfo.registerResult(op.getResult()).registerSuccess(true);
} catch (AzureBlobFileSystemException ex) {
@ -637,6 +655,19 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
return ioStatistics;
}
@VisibleForTesting
public boolean isLeaseFreed() {
if (lease == null) {
return true;
}
return lease.isFreed();
}
@VisibleForTesting
public boolean hasLease() {
return lease != null;
}
/**
* Appending AbfsOutputStream statistics to base toString().
*

View File

@ -39,6 +39,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
private int maxWriteRequestsToQueue;
private AbfsLease lease;
public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
super(sasTokenRenewPeriodForStreamsInSeconds);
}
@ -94,6 +96,11 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
return this;
}
public AbfsOutputStreamContext withLease(final AbfsLease lease) {
this.lease = lease;
return this;
}
public int getWriteBufferSize() {
return writeBufferSize;
}
@ -125,4 +132,15 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
public boolean isEnableSmallWriteOptimization() {
return this.enableSmallWriteOptimization;
}
public AbfsLease getLease() {
return this.lease;
}
public String getLeaseId() {
if (this.lease == null) {
return null;
}
return this.lease.getLeaseID();
}
}

View File

@ -131,6 +131,7 @@ public class AbfsRestOperation {
this.url = url;
this.requestHeaders = requestHeaders;
this.hasRequestBody = (AbfsHttpConstants.HTTP_METHOD_PUT.equals(method)
|| AbfsHttpConstants.HTTP_METHOD_POST.equals(method)
|| AbfsHttpConstants.HTTP_METHOD_PATCH.equals(method));
this.sasToken = sasToken;
this.abfsCounters = client.getAbfsCounters();

View File

@ -40,5 +40,6 @@ public enum AbfsRestOperationType {
Flush,
ReadFile,
DeletePath,
CheckAccess
CheckAccess,
LeasePath,
}

View File

@ -887,6 +887,22 @@ enabled for your Azure Storage account."
The directories can be specified as comma separated values. By default the value
is "/hbase"
### <a name="infiniteleaseoptions"></a> Infinite Lease Options
`fs.azure.infinite-lease.directories`: Directories for infinite lease support
can be specified comma separated in this config. By default, multiple
clients will be able to write to the same file simultaneously. When writing
to files contained within the directories specified in this config, the
client will obtain a lease on the file that will prevent any other clients
from writing to the file. When the output stream is closed, the lease will be
released. To revoke a client's write access for a file, the
AzureBlobFilesystem breakLease method may be called. If the client dies
before the file can be closed and the lease released, breakLease will need to
be called before another client will be able to write to the file.
`fs.azure.lease.threads`: This is the size of the thread pool that will be
used for lease operations for infinite lease directories. By default the value
is 0, so it must be set to at least 1 to support infinite lease directories.
### <a name="perfoptions"></a> Perf Options
#### <a name="abfstracklatencyoptions"></a> 1. HTTP Request Tracking Options

View File

@ -0,0 +1,336 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs;
import java.io.IOException;
import java.util.concurrent.RejectedExecutionException;
import org.junit.Assert;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsLease;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_INFINITE_LEASE_KEY;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_THREADS;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_ACQUIRING_LEASE;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_LEASE_EXPIRED;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_LEASE_NOT_PRESENT;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_NO_LEASE_ID_SPECIFIED;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_NO_LEASE_THREADS;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_PARALLEL_ACCESS_DETECTED;
/**
* Test lease operations.
*/
public class ITestAzureBlobFileSystemLease extends AbstractAbfsIntegrationTest {
private static final int TEST_EXECUTION_TIMEOUT = 30 * 1000;
private static final int LONG_TEST_EXECUTION_TIMEOUT = 90 * 1000;
private static final String TEST_FILE = "testfile";
private final boolean isHNSEnabled;
public ITestAzureBlobFileSystemLease() throws Exception {
super();
this.isHNSEnabled = getConfiguration()
.getBoolean(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT, false);
}
private AzureBlobFileSystem getCustomFileSystem(Path infiniteLeaseDirs, int numLeaseThreads) throws Exception {
Configuration conf = getRawConfiguration();
conf.setBoolean(String.format("fs.%s.impl.disable.cache", getAbfsScheme()), true);
conf.set(FS_AZURE_INFINITE_LEASE_KEY, infiniteLeaseDirs.toUri().getPath());
conf.setInt(FS_AZURE_LEASE_THREADS, numLeaseThreads);
return getFileSystem(conf);
}
@Test(timeout = TEST_EXECUTION_TIMEOUT)
public void testNoInfiniteLease() throws IOException {
final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
final AzureBlobFileSystem fs = getFileSystem();
fs.mkdirs(testFilePath.getParent());
try (FSDataOutputStream out = fs.create(testFilePath)) {
Assert.assertFalse("Output stream should not have lease",
((AbfsOutputStream) out.getWrappedStream()).hasLease());
}
Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
}
@Test(timeout = TEST_EXECUTION_TIMEOUT)
public void testNoLeaseThreads() throws Exception {
final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 0);
fs.mkdirs(testFilePath.getParent());
LambdaTestUtils.intercept(IOException.class, ERR_NO_LEASE_THREADS, () -> {
try (FSDataOutputStream out = fs.create(testFilePath)) {
}
return "No failure when lease requested with 0 lease threads";
});
}
@Test(timeout = TEST_EXECUTION_TIMEOUT)
public void testOneWriter() throws Exception {
final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
fs.mkdirs(testFilePath.getParent());
FSDataOutputStream out = fs.create(testFilePath);
Assert.assertTrue("Output stream should have lease",
((AbfsOutputStream) out.getWrappedStream()).hasLease());
out.close();
Assert.assertFalse("Output stream should not have lease",
((AbfsOutputStream) out.getWrappedStream()).hasLease());
Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
}
@Test(timeout = TEST_EXECUTION_TIMEOUT)
public void testSubDir() throws Exception {
final Path testFilePath = new Path(new Path(path(methodName.getMethodName()), "subdir"),
TEST_FILE);
final AzureBlobFileSystem fs =
getCustomFileSystem(testFilePath.getParent().getParent(), 1);
fs.mkdirs(testFilePath.getParent().getParent());
FSDataOutputStream out = fs.create(testFilePath);
Assert.assertTrue("Output stream should have lease",
((AbfsOutputStream) out.getWrappedStream()).hasLease());
out.close();
Assert.assertFalse("Output stream should not have lease",
((AbfsOutputStream) out.getWrappedStream()).hasLease());
Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
}
@Test(timeout = TEST_EXECUTION_TIMEOUT)
public void testTwoCreate() throws Exception {
final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
fs.mkdirs(testFilePath.getParent());
try (FSDataOutputStream out = fs.create(testFilePath)) {
LambdaTestUtils.intercept(IOException.class, isHNSEnabled ? ERR_PARALLEL_ACCESS_DETECTED
: ERR_NO_LEASE_ID_SPECIFIED, () -> {
try (FSDataOutputStream out2 = fs.create(testFilePath)) {
}
return "Expected second create on infinite lease dir to fail";
});
}
Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
}
private void twoWriters(AzureBlobFileSystem fs, Path testFilePath, boolean expectException) throws Exception {
try (FSDataOutputStream out = fs.create(testFilePath)) {
try (FSDataOutputStream out2 = fs.append(testFilePath)) {
out2.writeInt(2);
out2.hsync();
} catch (IOException e) {
if (expectException) {
GenericTestUtils.assertExceptionContains(ERR_ACQUIRING_LEASE, e);
} else {
throw e;
}
}
out.writeInt(1);
out.hsync();
}
Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
}
@Test(timeout = TEST_EXECUTION_TIMEOUT)
public void testTwoWritersCreateAppendNoInfiniteLease() throws Exception {
final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
final AzureBlobFileSystem fs = getFileSystem();
fs.mkdirs(testFilePath.getParent());
twoWriters(fs, testFilePath, false);
}
@Test(timeout = LONG_TEST_EXECUTION_TIMEOUT)
public void testTwoWritersCreateAppendWithInfiniteLeaseEnabled() throws Exception {
final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
fs.mkdirs(testFilePath.getParent());
twoWriters(fs, testFilePath, true);
}
@Test(timeout = TEST_EXECUTION_TIMEOUT)
public void testLeaseFreedOnClose() throws Exception {
final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
fs.mkdirs(testFilePath.getParent());
FSDataOutputStream out;
out = fs.create(testFilePath);
out.write(0);
Assert.assertTrue("Output stream should have lease",
((AbfsOutputStream) out.getWrappedStream()).hasLease());
out.close();
Assert.assertFalse("Output stream should not have lease after close",
((AbfsOutputStream) out.getWrappedStream()).hasLease());
Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
}
@Test(timeout = TEST_EXECUTION_TIMEOUT)
public void testWriteAfterBreakLease() throws Exception {
final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
fs.mkdirs(testFilePath.getParent());
FSDataOutputStream out;
out = fs.create(testFilePath);
out.write(0);
out.hsync();
fs.breakLease(testFilePath);
LambdaTestUtils.intercept(IOException.class, ERR_LEASE_EXPIRED, () -> {
out.write(1);
out.hsync();
return "Expected exception on write after lease break but got " + out;
});
LambdaTestUtils.intercept(IOException.class, ERR_LEASE_EXPIRED, () -> {
out.close();
return "Expected exception on close after lease break but got " + out;
});
Assert.assertTrue("Output stream lease should be freed",
((AbfsOutputStream) out.getWrappedStream()).isLeaseFreed());
try (FSDataOutputStream out2 = fs.append(testFilePath)) {
out2.write(2);
out2.hsync();
}
Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
}
@Test(timeout = LONG_TEST_EXECUTION_TIMEOUT)
public void testLeaseFreedAfterBreak() throws Exception {
final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
fs.mkdirs(testFilePath.getParent());
FSDataOutputStream out = fs.create(testFilePath);
out.write(0);
fs.breakLease(testFilePath);
LambdaTestUtils.intercept(IOException.class, ERR_LEASE_EXPIRED, () -> {
out.close();
return "Expected exception on close after lease break but got " + out;
});
Assert.assertTrue("Output stream lease should be freed",
((AbfsOutputStream) out.getWrappedStream()).isLeaseFreed());
Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
}
@Test(timeout = TEST_EXECUTION_TIMEOUT)
public void testInfiniteLease() throws Exception {
final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
fs.mkdirs(testFilePath.getParent());
try (FSDataOutputStream out = fs.create(testFilePath)) {
Assert.assertTrue("Output stream should have lease",
((AbfsOutputStream) out.getWrappedStream()).hasLease());
out.write(0);
}
Assert.assertTrue(fs.getAbfsStore().areLeasesFreed());
try (FSDataOutputStream out = fs.append(testFilePath)) {
Assert.assertTrue("Output stream should have lease",
((AbfsOutputStream) out.getWrappedStream()).hasLease());
out.write(1);
}
Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
}
@Test(timeout = TEST_EXECUTION_TIMEOUT)
public void testFileSystemClose() throws Exception {
final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
fs.mkdirs(testFilePath.getParent());
FSDataOutputStream out = fs.create(testFilePath);
out.write(0);
Assert.assertFalse("Store leases should exist", fs.getAbfsStore().areLeasesFreed());
fs.close();
Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
LambdaTestUtils.intercept(IOException.class, isHNSEnabled ? ERR_LEASE_NOT_PRESENT
: ERR_LEASE_EXPIRED, () -> {
out.close();
return "Expected exception on close after closed FS but got " + out;
});
LambdaTestUtils.intercept(RejectedExecutionException.class, () -> {
try (FSDataOutputStream out2 = fs.append(testFilePath)) {
}
return "Expected exception on new append after closed FS";
});
}
@Test(timeout = TEST_EXECUTION_TIMEOUT)
public void testAcquireRetry() throws Exception {
final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
fs.mkdirs(testFilePath.getParent());
fs.createNewFile(testFilePath);
AbfsLease lease = new AbfsLease(fs.getAbfsClient(), testFilePath.toUri().getPath());
Assert.assertNotNull("Did not successfully lease file", lease.getLeaseID());
lease.free();
Assert.assertEquals("Unexpected acquire retry count", 0, lease.getAcquireRetryCount());
AbfsClient mockClient = spy(fs.getAbfsClient());
doThrow(new AbfsLease.LeaseException("failed to acquire 1"))
.doThrow(new AbfsLease.LeaseException("failed to acquire 2"))
.doCallRealMethod()
.when(mockClient).acquireLease(anyString(), anyInt());
lease = new AbfsLease(mockClient, testFilePath.toUri().getPath(), 5, 1);
Assert.assertNotNull("Acquire lease should have retried", lease.getLeaseID());
lease.free();
Assert.assertEquals("Unexpected acquire retry count", 2, lease.getAcquireRetryCount());
doThrow(new AbfsLease.LeaseException("failed to acquire"))
.when(mockClient).acquireLease(anyString(), anyInt());
LambdaTestUtils.intercept(AzureBlobFileSystemException.class, () -> {
new AbfsLease(mockClient, testFilePath.toUri().getPath(), 5, 1);
});
}
}

View File

@ -24,11 +24,14 @@ import org.junit.Test;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
import org.apache.hadoop.fs.azurebfs.utils.Base64;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_LEASE_DURATION;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.INFINITE_LEASE_DURATION;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_LEASE_DURATION;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_LEASE_DURATION;
/**
* Test configuration validators.
@ -58,6 +61,26 @@ public class TestConfigurationValidators extends Assert {
integerConfigurationValidator.validate("3072");
}
@Test
public void testIntegerWithOutlierConfigValidator() throws Exception {
IntegerConfigurationBasicValidator integerConfigurationValidator = new IntegerConfigurationBasicValidator(
INFINITE_LEASE_DURATION, MIN_LEASE_DURATION, MAX_LEASE_DURATION, DEFAULT_LEASE_DURATION, FAKE_KEY,
false);
assertEquals(INFINITE_LEASE_DURATION, (int) integerConfigurationValidator.validate("-1"));
assertEquals(DEFAULT_LEASE_DURATION, (int) integerConfigurationValidator.validate(null));
assertEquals(MIN_LEASE_DURATION, (int) integerConfigurationValidator.validate("15"));
assertEquals(MAX_LEASE_DURATION, (int) integerConfigurationValidator.validate("60"));
}
@Test(expected = InvalidConfigurationValueException.class)
public void testIntegerWithOutlierConfigValidatorThrowsIfMissingValidValue() throws Exception {
IntegerConfigurationBasicValidator integerConfigurationValidator = new IntegerConfigurationBasicValidator(
INFINITE_LEASE_DURATION, MIN_LEASE_DURATION, MAX_LEASE_DURATION, DEFAULT_LEASE_DURATION, FAKE_KEY,
true);
integerConfigurationValidator.validate("14");
}
@Test
public void testLongConfigValidator() throws Exception {
LongConfigurationBasicValidator longConfigurationValidator = new LongConfigurationBasicValidator(

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.conf.Configuration;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.ArgumentMatchers.refEq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -86,7 +87,7 @@ public final class TestAbfsOutputStream {
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op);
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
@ -104,9 +105,9 @@ public final class TestAbfsOutputStream {
out.hsync();
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
0, 0, WRITE_SIZE, APPEND_MODE, false);
0, 0, WRITE_SIZE, APPEND_MODE, false, null);
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
WRITE_SIZE, 0, 2 * WRITE_SIZE, APPEND_MODE, false);
WRITE_SIZE, 0, 2 * WRITE_SIZE, APPEND_MODE, false, null);
verify(client, times(1)).append(
eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
@ -133,7 +134,7 @@ public final class TestAbfsOutputStream {
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op);
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
@ -146,9 +147,9 @@ public final class TestAbfsOutputStream {
out.close();
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
0, 0, BUFFER_SIZE, APPEND_MODE, false);
0, 0, BUFFER_SIZE, APPEND_MODE, false, null);
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
BUFFER_SIZE, 0, 5*WRITE_SIZE-BUFFER_SIZE, APPEND_MODE, false);
BUFFER_SIZE, 0, 5*WRITE_SIZE-BUFFER_SIZE, APPEND_MODE, false, null);
verify(client, times(1)).append(
eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
@ -165,7 +166,7 @@ public final class TestAbfsOutputStream {
ArgumentCaptor<String> acFlushSASToken = ArgumentCaptor.forClass(String.class);
verify(client, times(1)).flush(acFlushPath.capture(), acFlushPosition.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
acFlushSASToken.capture());
acFlushSASToken.capture(), isNull());
assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushPath.getAllValues());
assertThat(Arrays.asList(Long.valueOf(5*WRITE_SIZE))).describedAs("position").isEqualTo(acFlushPosition.getAllValues());
assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
@ -189,7 +190,7 @@ public final class TestAbfsOutputStream {
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op);
when(op.getSasToken()).thenReturn("testToken");
when(op.getResult()).thenReturn(httpOp);
@ -204,9 +205,9 @@ public final class TestAbfsOutputStream {
out.close();
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
0, 0, BUFFER_SIZE, APPEND_MODE, false);
0, 0, BUFFER_SIZE, APPEND_MODE, false, null);
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false);
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null);
verify(client, times(1)).append(
eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
@ -223,7 +224,7 @@ public final class TestAbfsOutputStream {
ArgumentCaptor<String> acFlushSASToken = ArgumentCaptor.forClass(String.class);
verify(client, times(1)).flush(acFlushPath.capture(), acFlushPosition.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
acFlushSASToken.capture());
acFlushSASToken.capture(), isNull());
assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushPath.getAllValues());
assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushPosition.getAllValues());
assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
@ -247,7 +248,7 @@ public final class TestAbfsOutputStream {
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op);
when(op.getSasToken()).thenReturn("testToken");
when(op.getResult()).thenReturn(httpOp);
@ -262,9 +263,9 @@ public final class TestAbfsOutputStream {
Thread.sleep(1000);
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
0, 0, BUFFER_SIZE, APPEND_MODE, false);
0, 0, BUFFER_SIZE, APPEND_MODE, false, null);
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false);
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null);
verify(client, times(1)).append(
eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
@ -291,7 +292,7 @@ public final class TestAbfsOutputStream {
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op);
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, true));
@ -304,9 +305,9 @@ public final class TestAbfsOutputStream {
Thread.sleep(1000);
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
0, 0, BUFFER_SIZE, APPEND_MODE, true);
0, 0, BUFFER_SIZE, APPEND_MODE, true, null);
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, true);
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, true, null);
verify(client, times(1)).append(
eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
@ -334,7 +335,7 @@ public final class TestAbfsOutputStream {
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op);
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
@ -347,9 +348,9 @@ public final class TestAbfsOutputStream {
out.hflush();
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
0, 0, BUFFER_SIZE, APPEND_MODE, false);
0, 0, BUFFER_SIZE, APPEND_MODE, false, null);
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false);
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null);
verify(client, times(1)).append(
eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
@ -366,7 +367,7 @@ public final class TestAbfsOutputStream {
ArgumentCaptor<String> acFlushSASToken = ArgumentCaptor.forClass(String.class);
verify(client, times(1)).flush(acFlushPath.capture(), acFlushPosition.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
acFlushSASToken.capture());
acFlushSASToken.capture(), isNull());
assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushPath.getAllValues());
assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushPosition.getAllValues());
assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
@ -388,7 +389,7 @@ public final class TestAbfsOutputStream {
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op);
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
@ -403,9 +404,9 @@ public final class TestAbfsOutputStream {
Thread.sleep(1000);
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
0, 0, BUFFER_SIZE, APPEND_MODE, false);
0, 0, BUFFER_SIZE, APPEND_MODE, false, null);
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false);
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null);
verify(client, times(1)).append(
eq(PATH), any(byte[].class), refEq(firstReqParameters), any());