Revert "HADOOP-16818. ABFS: Combine append+flush calls for blockblob & appendblob"

This reverts commit 3612317038.

Change-Id: Ie0d36f25de0b55a937894f4d9963c495bae0576a
This commit is contained in:
Steve Loughran 2020-03-26 15:09:13 +00:00
parent 679631b188
commit 745a6c1e69
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
11 changed files with 34 additions and 570 deletions

View File

@ -143,10 +143,6 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES) DefaultValue = DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES)
private String azureAtomicDirs; private String azureAtomicDirs;
@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_APPEND_BLOB_KEY,
DefaultValue = DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES)
private String azureAppendBlobDirs;
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, @BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
DefaultValue = DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION) DefaultValue = DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION)
private boolean createRemoteFileSystemDuringInitialization; private boolean createRemoteFileSystemDuringInitialization;
@ -167,10 +163,6 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH) DefaultValue = DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH)
private boolean disableOutputStreamFlush; private boolean disableOutputStreamFlush;
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_APPEND_WITH_FLUSH,
DefaultValue = DEFAULT_ENABLE_APPEND_WITH_FLUSH)
private boolean enableAppendWithFlush;
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_AUTOTHROTTLING, @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_AUTOTHROTTLING,
DefaultValue = DEFAULT_ENABLE_AUTOTHROTTLING) DefaultValue = DEFAULT_ENABLE_AUTOTHROTTLING)
private boolean enableAutoThrottling; private boolean enableAutoThrottling;
@ -457,10 +449,6 @@ public class AbfsConfiguration{
return this.azureAtomicDirs; return this.azureAtomicDirs;
} }
public String getAppendBlobDirs() {
return this.azureAppendBlobDirs;
}
public boolean getCreateRemoteFileSystemDuringInitialization() { public boolean getCreateRemoteFileSystemDuringInitialization() {
// we do not support creating the filesystem when AuthType is SAS // we do not support creating the filesystem when AuthType is SAS
return this.createRemoteFileSystemDuringInitialization return this.createRemoteFileSystemDuringInitialization
@ -483,10 +471,6 @@ public class AbfsConfiguration{
return this.disableOutputStreamFlush; return this.disableOutputStreamFlush;
} }
public boolean isAppendWithFlushEnabled() {
return this.enableAppendWithFlush;
}
public boolean isAutoThrottlingEnabled() { public boolean isAutoThrottlingEnabled() {
return this.enableAutoThrottling; return this.enableAutoThrottling;
} }

View File

@ -137,11 +137,6 @@ public class AzureBlobFileSystemStore implements Closeable {
private final IdentityTransformer identityTransformer; private final IdentityTransformer identityTransformer;
private final AbfsPerfTracker abfsPerfTracker; private final AbfsPerfTracker abfsPerfTracker;
/**
* The set of directories where we should store files as append blobs.
*/
private Set<String> appendBlobDirSet;
public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration configuration) public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration configuration)
throws IOException { throws IOException {
this.uri = uri; this.uri = uri;
@ -182,22 +177,6 @@ public class AzureBlobFileSystemStore implements Closeable {
initializeClient(uri, fileSystemName, accountName, useHttps); initializeClient(uri, fileSystemName, accountName, useHttps);
this.identityTransformer = new IdentityTransformer(abfsConfiguration.getRawConfiguration()); this.identityTransformer = new IdentityTransformer(abfsConfiguration.getRawConfiguration());
LOG.trace("IdentityTransformer init complete"); LOG.trace("IdentityTransformer init complete");
// Extract the directories that should contain append blobs
String appendBlobDirs = abfsConfiguration.getAppendBlobDirs();
if (appendBlobDirs.trim().isEmpty()) {
this.appendBlobDirSet = new HashSet<String>();
} else {
this.appendBlobDirSet = new HashSet<>(Arrays.asList(
abfsConfiguration.getAppendBlobDirs().split(AbfsHttpConstants.COMMA)));
}
}
/**
* Checks if the given key in Azure Storage should be stored as a page
* blob instead of block blob.
*/
public boolean isAppendBlobKey(String key) {
return isKeyForDirectorySet(key, appendBlobDirSet);
} }
/** /**
@ -424,25 +403,18 @@ public class AzureBlobFileSystemStore implements Closeable {
umask.toString(), umask.toString(),
isNamespaceEnabled); isNamespaceEnabled);
boolean appendBlob = false; final AbfsRestOperation op = client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), true, overwrite,
if (isAppendBlobKey(path.toString())) { isNamespaceEnabled ? getOctalNotation(permission) : null,
appendBlob = true; isNamespaceEnabled ? getOctalNotation(umask) : null);
} perfInfo.registerResult(op.getResult()).registerSuccess(true);
client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), true, overwrite,
isNamespaceEnabled ? getOctalNotation(permission) : null,
isNamespaceEnabled ? getOctalNotation(umask) : null,
appendBlob);
return new AbfsOutputStream( return new AbfsOutputStream(
client, client,
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
0, 0,
abfsConfiguration.getWriteBufferSize(), abfsConfiguration.getWriteBufferSize(),
abfsConfiguration.isFlushEnabled(), abfsConfiguration.isFlushEnabled(),
abfsConfiguration.isOutputStreamFlushDisabled(), abfsConfiguration.isOutputStreamFlushDisabled());
abfsConfiguration.isAppendWithFlushEnabled(),
appendBlob);
} }
} }
@ -458,8 +430,8 @@ public class AzureBlobFileSystemStore implements Closeable {
isNamespaceEnabled); isNamespaceEnabled);
final AbfsRestOperation op = client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), false, true, final AbfsRestOperation op = client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), false, true,
isNamespaceEnabled ? getOctalNotation(permission) : null, isNamespaceEnabled ? getOctalNotation(permission) : null,
isNamespaceEnabled ? getOctalNotation(umask) : null, false); isNamespaceEnabled ? getOctalNotation(umask) : null);
perfInfo.registerResult(op.getResult()).registerSuccess(true); perfInfo.registerResult(op.getResult()).registerSuccess(true);
} }
} }
@ -522,20 +494,13 @@ public class AzureBlobFileSystemStore implements Closeable {
perfInfo.registerSuccess(true); perfInfo.registerSuccess(true);
boolean appendBlob = false;
if (isAppendBlobKey(path.toString())) {
appendBlob = true;
}
return new AbfsOutputStream( return new AbfsOutputStream(
client, client,
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
offset, offset,
abfsConfiguration.getWriteBufferSize(), abfsConfiguration.getWriteBufferSize(),
abfsConfiguration.isFlushEnabled(), abfsConfiguration.isFlushEnabled(),
abfsConfiguration.isOutputStreamFlushDisabled(), abfsConfiguration.isOutputStreamFlushDisabled());
abfsConfiguration.isAppendWithFlushEnabled(),
appendBlob);
} }
} }

View File

@ -40,7 +40,6 @@ public final class AbfsHttpConstants {
public static final String CHECK_ACCESS = "checkAccess"; public static final String CHECK_ACCESS = "checkAccess";
public static final String GET_STATUS = "getStatus"; public static final String GET_STATUS = "getStatus";
public static final String DEFAULT_TIMEOUT = "90"; public static final String DEFAULT_TIMEOUT = "90";
public static final String APPEND_BLOB_TYPE = "appendblob";
public static final String TOKEN_VERSION = "2"; public static final String TOKEN_VERSION = "2";
public static final String JAVA_VERSION = "java.version"; public static final String JAVA_VERSION = "java.version";

View File

@ -51,7 +51,6 @@ public final class ConfigurationKeys {
public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling"; public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling";
public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https"; public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https";
public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key"; public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key";
public static final String FS_AZURE_APPEND_BLOB_KEY = "fs.azure.appendblob.key";
public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth"; public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth";
/** Provides a config control to enable or disable ABFS Flush operations - /** Provides a config control to enable or disable ABFS Flush operations -
* HFlush and HSync. Default is true. **/ * HFlush and HSync. Default is true. **/
@ -62,10 +61,6 @@ public final class ConfigurationKeys {
* documentation does not have such expectations of data being persisted. * documentation does not have such expectations of data being persisted.
* Default value of this config is true. **/ * Default value of this config is true. **/
public static final String FS_AZURE_DISABLE_OUTPUTSTREAM_FLUSH = "fs.azure.disable.outputstream.flush"; public static final String FS_AZURE_DISABLE_OUTPUTSTREAM_FLUSH = "fs.azure.disable.outputstream.flush";
/** Provides a config control to enable OutputStream AppendWithFlush API
* operations in AbfsOutputStream.
* Default value of this config is true. **/
public static final String FS_AZURE_ENABLE_APPEND_WITH_FLUSH = "fs.azure.enable.appendwithflush";
public static final String FS_AZURE_USER_AGENT_PREFIX_KEY = "fs.azure.user.agent.prefix"; public static final String FS_AZURE_USER_AGENT_PREFIX_KEY = "fs.azure.user.agent.prefix";
public static final String FS_AZURE_SSL_CHANNEL_MODE_KEY = "fs.azure.ssl.channel.mode"; public static final String FS_AZURE_SSL_CHANNEL_MODE_KEY = "fs.azure.ssl.channel.mode";
/** Provides a config to enable/disable the checkAccess API. /** Provides a config to enable/disable the checkAccess API.

View File

@ -55,12 +55,10 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = false; public static final boolean DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = false;
public static final String DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES = "/hbase"; public static final String DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES = "/hbase";
public static final String DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES = "";
public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1; public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;
public static final boolean DEFAULT_ENABLE_FLUSH = true; public static final boolean DEFAULT_ENABLE_FLUSH = true;
public static final boolean DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH = true; public static final boolean DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH = true;
public static final boolean DEFAULT_ENABLE_APPEND_WITH_FLUSH = true;
public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = true; public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = true;
public static final DelegatingSSLSocketFactory.SSLChannelMode DEFAULT_FS_AZURE_SSL_CHANNEL_MODE public static final DelegatingSSLSocketFactory.SSLChannelMode DEFAULT_FS_AZURE_SSL_CHANNEL_MODE

View File

@ -38,8 +38,6 @@ public final class HttpQueryParams {
public static final String QUERY_PARAM_RETAIN_UNCOMMITTED_DATA = "retainUncommittedData"; public static final String QUERY_PARAM_RETAIN_UNCOMMITTED_DATA = "retainUncommittedData";
public static final String QUERY_PARAM_CLOSE = "close"; public static final String QUERY_PARAM_CLOSE = "close";
public static final String QUERY_PARAM_UPN = "upn"; public static final String QUERY_PARAM_UPN = "upn";
public static final String QUERY_PARAM_FLUSH = "flush";
public static final String QUERY_PARAM_BLOBTYPE = "blobtype";
private HttpQueryParams() {} private HttpQueryParams() {}
} }

View File

@ -119,6 +119,7 @@ public class AbfsClient implements Closeable {
this.sasTokenProvider = sasTokenProvider; this.sasTokenProvider = sasTokenProvider;
} }
@Override
public void close() throws IOException { public void close() throws IOException {
if (tokenProvider instanceof Closeable) { if (tokenProvider instanceof Closeable) {
IOUtils.cleanupWithLogger(LOG, (Closeable) tokenProvider); IOUtils.cleanupWithLogger(LOG, (Closeable) tokenProvider);
@ -260,8 +261,7 @@ public class AbfsClient implements Closeable {
} }
public AbfsRestOperation createPath(final String path, final boolean isFile, final boolean overwrite, public AbfsRestOperation createPath(final String path, final boolean isFile, final boolean overwrite,
final String permission, final String umask, final String permission, final String umask) throws AzureBlobFileSystemException {
final boolean appendBlob) throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
if (!overwrite) { if (!overwrite) {
requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, AbfsHttpConstants.STAR)); requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, AbfsHttpConstants.STAR));
@ -277,9 +277,6 @@ public class AbfsClient implements Closeable {
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY); abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY);
if (appendBlob) {
abfsUriQueryBuilder.addQuery(QUERY_PARAM_BLOBTYPE, APPEND_BLOB_TYPE);
}
String operation = isFile String operation = isFile
? SASTokenProvider.CREATEFILE_OPERATION ? SASTokenProvider.CREATEFILE_OPERATION
@ -328,8 +325,7 @@ public class AbfsClient implements Closeable {
} }
public AbfsRestOperation append(final String path, final long position, final byte[] buffer, final int offset, public AbfsRestOperation append(final String path, final long position, final byte[] buffer, final int offset,
final int length, boolean flush, boolean isClose) final int length) throws AzureBlobFileSystemException {
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
// JDK7 does not support PATCH, so to workaround the issue we will use // JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header. // PUT and specify the real method in the X-Http-Method-Override header.
@ -339,8 +335,6 @@ public class AbfsClient implements Closeable {
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION); abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position)); abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position));
abfsUriQueryBuilder.addQuery(QUERY_PARAM_FLUSH, String.valueOf(flush));
abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(isClose));
appendSASTokenToQuery(path, SASTokenProvider.APPEND_OPERATION, abfsUriQueryBuilder); appendSASTokenToQuery(path, SASTokenProvider.APPEND_OPERATION, abfsUriQueryBuilder);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());

View File

@ -55,8 +55,6 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
private boolean closed; private boolean closed;
private boolean supportFlush; private boolean supportFlush;
private boolean disableOutputStreamFlush; private boolean disableOutputStreamFlush;
private boolean supportAppendWithFlush;
private boolean appendBlob;
private volatile IOException lastError; private volatile IOException lastError;
private long lastFlushOffset; private long lastFlushOffset;
@ -86,18 +84,13 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
final long position, final long position,
final int bufferSize, final int bufferSize,
final boolean supportFlush, final boolean supportFlush,
final boolean disableOutputStreamFlush, final boolean disableOutputStreamFlush) {
final boolean supportAppendWithFlush,
final boolean appendBlob) {
this.client = client; this.client = client;
this.path = path; this.path = path;
this.position = position; this.position = position;
this.closed = false; this.closed = false;
this.disableOutputStreamFlush = disableOutputStreamFlush;
this.supportFlush = supportFlush; this.supportFlush = supportFlush;
this.disableOutputStreamFlush = disableOutputStreamFlush; this.disableOutputStreamFlush = disableOutputStreamFlush;
this.supportAppendWithFlush = supportAppendWithFlush;
this.appendBlob = appendBlob;
this.lastError = null; this.lastError = null;
this.lastFlushOffset = 0; this.lastFlushOffset = 0;
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
@ -106,6 +99,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
this.writeOperations = new ConcurrentLinkedDeque<>(); this.writeOperations = new ConcurrentLinkedDeque<>();
this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
this.threadExecutor this.threadExecutor
= new ThreadPoolExecutor(maxConcurrentRequestCount, = new ThreadPoolExecutor(maxConcurrentRequestCount,
maxConcurrentRequestCount, maxConcurrentRequestCount,
@ -176,7 +170,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
if (writableBytes <= numberOfBytesToWrite) { if (writableBytes <= numberOfBytesToWrite) {
System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes); System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes);
bufferIndex += writableBytes; bufferIndex += writableBytes;
writeCurrentBufferToService(false, false); writeCurrentBufferToService();
currentOffset += writableBytes; currentOffset += writableBytes;
numberOfBytesToWrite = numberOfBytesToWrite - writableBytes; numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
} else { } else {
@ -274,16 +268,17 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
private synchronized void flushInternal(boolean isClose) throws IOException { private synchronized void flushInternal(boolean isClose) throws IOException {
maybeThrowLastError(); maybeThrowLastError();
writeAndFlushWrittenBytesToService(isClose); writeCurrentBufferToService();
flushWrittenBytesToService(isClose);
} }
private synchronized void flushInternalAsync() throws IOException { private synchronized void flushInternalAsync() throws IOException {
maybeThrowLastError(); maybeThrowLastError();
writeCurrentBufferToService(true, false); writeCurrentBufferToService();
flushWrittenBytesToServiceAsync(); flushWrittenBytesToServiceAsync();
} }
private synchronized void writeCurrentBufferToService(final boolean flush, final boolean isClose) throws IOException { private synchronized void writeCurrentBufferToService() throws IOException {
if (bufferIndex == 0) { if (bufferIndex == 0) {
return; return;
} }
@ -295,16 +290,6 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
final long offset = position; final long offset = position;
position += bytesLength; position += bytesLength;
if (this.appendBlob) {
client.append(path, offset, bytes, 0,
bytesLength, flush, isClose);
lastTotalAppendOffset += bytesLength;
if (flush) {
lastFlushOffset = lastTotalAppendOffset;
}
return;
}
if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) { if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
waitForTaskToComplete(); waitForTaskToComplete();
} }
@ -315,15 +300,8 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
AbfsPerfTracker tracker = client.getAbfsPerfTracker(); AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
"writeCurrentBufferToService", "append")) { "writeCurrentBufferToService", "append")) {
if (flush) {
/* Append with Flush enabled should happen
* when all the data which was supposed to be
* appended has been sent and finished.
*/
while(lastTotalAppendOffset < lastFlushOffset);
}
AbfsRestOperation op = client.append(path, offset, bytes, 0, AbfsRestOperation op = client.append(path, offset, bytes, 0,
bytesLength, flush, isClose); bytesLength);
perfInfo.registerResult(op.getResult()); perfInfo.registerResult(op.getResult());
byteBufferPool.putBuffer(ByteBuffer.wrap(bytes)); byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
perfInfo.registerSuccess(true); perfInfo.registerSuccess(true);
@ -332,7 +310,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
} }
}); });
writeOperations.add(new WriteOperation(job, offset, bytesLength, flush)); writeOperations.add(new WriteOperation(job, offset, bytesLength));
// Try to shrink the queue // Try to shrink the queue
shrinkWriteOperationQueue(); shrinkWriteOperationQueue();
@ -348,6 +326,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
throw new FileNotFoundException(ex.getMessage()); throw new FileNotFoundException(ex.getMessage());
} }
} }
if (ex.getCause() instanceof AzureBlobFileSystemException) { if (ex.getCause() instanceof AzureBlobFileSystemException) {
ex = (AzureBlobFileSystemException) ex.getCause(); ex = (AzureBlobFileSystemException) ex.getCause();
} }
@ -355,36 +334,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
throw lastError; throw lastError;
} }
} }
shrinkWriteOperationQueue(); flushWrittenBytesToServiceInternal(position, false, isClose);
}
private synchronized void completeExistingTasks() throws IOException {
for (WriteOperation writeOperation : writeOperations) {
try {
writeOperation.task.get();
} catch (Exception ex) {
if (ex.getCause() instanceof AbfsRestOperationException) {
if (((AbfsRestOperationException) ex.getCause()).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
throw new FileNotFoundException(ex.getMessage());
}
}
if (ex.getCause() instanceof AzureBlobFileSystemException) {
ex = (AzureBlobFileSystemException) ex.getCause();
}
lastError = new IOException(ex);
throw lastError;
}
}
shrinkWriteOperationQueue();
}
private synchronized void writeAndFlushWrittenBytesToService(boolean isClose) throws IOException {
completeExistingTasks();
writeCurrentBufferToService(supportAppendWithFlush, isClose);
completeExistingTasks();
if (this.lastTotalAppendOffset > this.lastFlushOffset) {
flushWrittenBytesToServiceInternal(position, false, isClose);
}
} }
private synchronized void flushWrittenBytesToServiceAsync() throws IOException { private synchronized void flushWrittenBytesToServiceAsync() throws IOException {
@ -423,9 +373,6 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
while (writeOperations.peek() != null && writeOperations.peek().task.isDone()) { while (writeOperations.peek() != null && writeOperations.peek().task.isDone()) {
writeOperations.peek().task.get(); writeOperations.peek().task.get();
lastTotalAppendOffset += writeOperations.peek().length; lastTotalAppendOffset += writeOperations.peek().length;
if (writeOperations.peek().isFlush) {
lastFlushOffset = lastTotalAppendOffset;
}
writeOperations.remove(); writeOperations.remove();
} }
} catch (Exception e) { } catch (Exception e) {
@ -458,9 +405,8 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
private final Future<Void> task; private final Future<Void> task;
private final long startOffset; private final long startOffset;
private final long length; private final long length;
private final boolean isFlush;
WriteOperation(final Future<Void> task, final long startOffset, final long length, final boolean flush) { WriteOperation(final Future<Void> task, final long startOffset, final long length) {
Preconditions.checkNotNull(task, "task"); Preconditions.checkNotNull(task, "task");
Preconditions.checkArgument(startOffset >= 0, "startOffset"); Preconditions.checkArgument(startOffset >= 0, "startOffset");
Preconditions.checkArgument(length >= 0, "length"); Preconditions.checkArgument(length >= 0, "length");
@ -468,7 +414,6 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
this.task = task; this.task = task;
this.startOffset = startOffset; this.startOffset = startOffset;
this.length = length; this.length = length;
this.isFlush = flush;
} }
} }

View File

@ -643,10 +643,6 @@ Consult the javadocs for `org.apache.hadoop.fs.azurebfs.constants.ConfigurationK
`org.apache.hadoop.fs.azurebfs.AbfsConfiguration` for the full list `org.apache.hadoop.fs.azurebfs.AbfsConfiguration` for the full list
of configuration options and their default values. of configuration options and their default values.
### <a name="appendblobkeyconfigoptions"></a> Append Blob Directories Options
#### Config `fs.azure.appendblob.key` provides
an option for using append blob for the files prefixed by the config value.
### <a name="flushconfigoptions"></a> Flush Options ### <a name="flushconfigoptions"></a> Flush Options
#### <a name="abfsflushconfigoptions"></a> 1. Azure Blob File System Flush Options #### <a name="abfsflushconfigoptions"></a> 1. Azure Blob File System Flush Options

View File

@ -206,13 +206,10 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest {
FSDataOutputStream stream = fs.create(testFilePath); FSDataOutputStream stream = fs.create(testFilePath);
assertTrue(fs.exists(testFilePath)); assertTrue(fs.exists(testFilePath));
stream.write(TEST_BYTE);
fs.delete(testFilePath, true); fs.delete(testFilePath, true);
assertFalse(fs.exists(testFilePath)); assertFalse(fs.exists(testFilePath));
AbfsConfiguration configuration = this.getConfiguration();
// trigger flush call
intercept(FileNotFoundException.class, intercept(FileNotFoundException.class,
() -> stream.close()); () -> stream.close());
} }

View File

@ -1,407 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Random;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.conf.Configuration;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.anyLong;
import static org.assertj.core.api.Assertions.assertThat;
public final class TestAbfsOutputStream {
private static final int bufferSize = 4096;
private static final int writeSize = 1000;
private static final String path = "~/testpath";
private final String globalKey = "fs.azure.configuration";
private final String accountName1 = "account1";
private final String accountKey1 = globalKey + "." + accountName1;
private final String accountValue1 = "one";
/**
* The test verifies OutputStream shortwrite case(2000bytes write followed by flush, hflush, hsync) is making correct HTTP calls to the server
*/
@Test
public void verifyShortWriteRequest() throws Exception {
AbfsClient client = mock(AbfsClient.class);
AbfsRestOperation op = mock(AbfsRestOperation.class);
AbfsConfiguration abfsConf;
final Configuration conf = new Configuration();
conf.set(accountKey1, accountValue1);
abfsConf = new AbfsConfiguration(conf, accountName1);
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), anyBoolean(), anyBoolean())).thenReturn(op);
AbfsOutputStream out = new AbfsOutputStream(client, path, 0, bufferSize, true, false, true, false);
final byte[] b = new byte[writeSize];
new Random().nextBytes(b);
out.write(b);
out.hsync();
ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
ArgumentCaptor<Boolean> acFlush = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<Boolean> acClose = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
final byte[] b1 = new byte[2*writeSize];
new Random().nextBytes(b1);
out.write(b1);
out.flush();
out.hflush();
out.hsync();
verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
acFlush.capture(), acClose.capture());
assertThat(Arrays.asList(path, path)).describedAs("Path of the requests").isEqualTo(acString.getAllValues());
assertThat(Arrays.asList(Long.valueOf(0), Long.valueOf(writeSize))).describedAs("Write Position").isEqualTo(acLong.getAllValues());
//flush=true, close=false, flush=true, close=false
assertThat(Arrays.asList(true, true)).describedAs("Flush = true/false").isEqualTo(acFlush.getAllValues());
assertThat(Arrays.asList(false, false)).describedAs("Close = true/false").isEqualTo(acClose.getAllValues());
assertThat(Arrays.asList(0,0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
assertThat(Arrays.asList(writeSize, 2*writeSize)).describedAs("Buffer length").isEqualTo(acBufferLength.getAllValues());
}
/**
* The test verifies OutputStream Write of writeSize(1000 bytes) followed by a close is making correct HTTP calls to the server
*/
@Test
public void verifyWriteRequest() throws Exception {
AbfsClient client = mock(AbfsClient.class);
AbfsRestOperation op = mock(AbfsRestOperation.class);
AbfsConfiguration abfsConf;
final Configuration conf = new Configuration();
conf.set(accountKey1, accountValue1);
abfsConf = new AbfsConfiguration(conf, accountName1);
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), anyBoolean(), anyBoolean())).thenReturn(op);
AbfsOutputStream out = new AbfsOutputStream(client, path, 0, bufferSize, true, false, true, false);
final byte[] b = new byte[writeSize];
new Random().nextBytes(b);
for (int i = 0; i < 5; i++) {
out.write(b);
}
out.close();
ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
ArgumentCaptor<Boolean> acFlush = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<Boolean> acClose = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
acFlush.capture(), acClose.capture());
assertThat(Arrays.asList(path, path)).describedAs("Path").isEqualTo(acString.getAllValues());
assertThat(Arrays.asList(Long.valueOf(0), Long.valueOf(bufferSize))).describedAs("Position").isEqualTo(acLong.getAllValues());
//flush=false,close=false, flush=true,close=true
assertThat(Arrays.asList(false, true)).describedAs("Flush = true/false").isEqualTo(acFlush.getAllValues());
assertThat(Arrays.asList(false, true)).describedAs("Close = true/false").isEqualTo(acClose.getAllValues());
assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
assertThat(Arrays.asList(bufferSize, 5*writeSize-bufferSize)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues());
}
/**
* The test verifies OutputStream Write of bufferSize(4KB) followed by a close is making correct HTTP calls to the server
*/
@Test
public void verifyWriteRequestOfBufferSizeAndClose() throws Exception {
AbfsClient client = mock(AbfsClient.class);
AbfsRestOperation op = mock(AbfsRestOperation.class);
AbfsConfiguration abfsConf;
final Configuration conf = new Configuration();
conf.set(accountKey1, accountValue1);
abfsConf = new AbfsConfiguration(conf, accountName1);
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), anyBoolean(), anyBoolean())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean())).thenReturn(op);
AbfsOutputStream out = new AbfsOutputStream(client, path, 0, bufferSize, true, false, true, false);
final byte[] b = new byte[bufferSize];
new Random().nextBytes(b);
for (int i = 0; i < 2; i++) {
out.write(b);
}
out.close();
ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
ArgumentCaptor<Boolean> acFlush = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<Boolean> acClose = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
acFlush.capture(), acClose.capture());
assertThat(Arrays.asList(path, path)).describedAs("path").isEqualTo(acString.getAllValues());
assertThat(new HashSet<Long>(Arrays.asList(Long.valueOf(0), Long.valueOf(bufferSize)))).describedAs("Position").isEqualTo(new HashSet<Long>(
acLong.getAllValues()));
//flush=false, close=false, flush=false, close=false
assertThat(Arrays.asList(false, false)).describedAs("Flush = true/false").isEqualTo(acFlush.getAllValues());
assertThat(Arrays.asList(false, false)).describedAs("Close = true/false").isEqualTo(acClose.getAllValues());
assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
assertThat(Arrays.asList(bufferSize, bufferSize)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues());
ArgumentCaptor<String> acFlushString = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Long> acFlushLong = ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<Boolean> acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<Boolean> acFlushClose = ArgumentCaptor.forClass(Boolean.class);
verify(client, times(1)).flush(acFlushString.capture(), acFlushLong.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture());
assertThat(Arrays.asList(path)).describedAs("path").isEqualTo(acFlushString.getAllValues());
assertThat(Arrays.asList(Long.valueOf(2*bufferSize))).describedAs("position").isEqualTo(acFlushLong.getAllValues());
assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
assertThat(Arrays.asList(true)).describedAs("Close flag").isEqualTo(acFlushClose.getAllValues());
}
/**
* The test verifies OutputStream Write of bufferSize(4KB) is making correct HTTP calls to the server
*/
@Test
public void verifyWriteRequestOfBufferSize() throws Exception {
AbfsClient client = mock(AbfsClient.class);
AbfsRestOperation op = mock(AbfsRestOperation.class);
AbfsConfiguration abfsConf;
final Configuration conf = new Configuration();
conf.set(accountKey1, accountValue1);
abfsConf = new AbfsConfiguration(conf, accountName1);
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), anyBoolean(), anyBoolean())).thenReturn(op);
AbfsOutputStream out = new AbfsOutputStream(client, path, 0, bufferSize, true, false, true, false);
final byte[] b = new byte[bufferSize];
new Random().nextBytes(b);
for (int i = 0; i < 2; i++) {
out.write(b);
}
Thread.sleep(1000);
ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
ArgumentCaptor<Boolean> acFlush = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<Boolean> acClose = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
acFlush.capture(), acClose.capture());
assertThat(Arrays.asList(path, path)).describedAs("File Path").isEqualTo(acString.getAllValues());
assertThat(new HashSet<Long>(Arrays.asList(Long.valueOf(0), Long.valueOf(bufferSize)))).describedAs("Position in file").isEqualTo(
new HashSet<Long>(acLong.getAllValues()));
//flush=false, close=false, flush=false, close=false
assertThat(Arrays.asList(false, false)).describedAs("flush flag").isEqualTo(acFlush.getAllValues());
assertThat(Arrays.asList(false, false)).describedAs("close flag").isEqualTo(acClose.getAllValues());
assertThat(Arrays.asList(0, 0)).describedAs("buffer offset").isEqualTo(acBufferOffset.getAllValues());
assertThat(Arrays.asList(bufferSize, bufferSize)).describedAs("buffer length").isEqualTo(acBufferLength.getAllValues());
}
/**
* The test verifies OutputStream Write of bufferSize(4KB) on a AppendBlob based stream is making correct HTTP calls to the server
*/
@Test
public void verifyWriteRequestOfBufferSizeWithAppendBlob() throws Exception {
AbfsClient client = mock(AbfsClient.class);
AbfsRestOperation op = mock(AbfsRestOperation.class);
AbfsConfiguration abfsConf;
final Configuration conf = new Configuration();
conf.set(accountKey1, accountValue1);
abfsConf = new AbfsConfiguration(conf, accountName1);
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), anyBoolean(), anyBoolean())).thenReturn(op);
AbfsOutputStream out = new AbfsOutputStream(client, path, 0, bufferSize, true, false, true, false);
final byte[] b = new byte[bufferSize];
new Random().nextBytes(b);
for (int i = 0; i < 2; i++) {
out.write(b);
}
Thread.sleep(1000);
ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
ArgumentCaptor<Boolean> acFlush = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<Boolean> acClose = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
acFlush.capture(), acClose.capture());
assertThat(Arrays.asList(path, path)).describedAs("File Path").isEqualTo(acString.getAllValues());
assertThat(Arrays.asList(Long.valueOf(0), Long.valueOf(bufferSize))).describedAs("File Position").isEqualTo(acLong.getAllValues());
//flush=false, close=false, flush=false, close=false
assertThat(Arrays.asList(false, false)).describedAs("Flush Flag").isEqualTo(acFlush.getAllValues());
assertThat(Arrays.asList(false, false)).describedAs("Close Flag").isEqualTo(acClose.getAllValues());
assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
assertThat(Arrays.asList(bufferSize, bufferSize)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues());
}
/**
* The test verifies OutputStream Write of bufferSize(4KB) followed by a hflush call is making correct HTTP calls to the server
*/
@Test
public void verifyWriteRequestOfBufferSizeAndHFlush() throws Exception {
AbfsClient client = mock(AbfsClient.class);
AbfsRestOperation op = mock(AbfsRestOperation.class);
AbfsConfiguration abfsConf;
final Configuration conf = new Configuration();
conf.set(accountKey1, accountValue1);
abfsConf = new AbfsConfiguration(conf, accountName1);
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), anyBoolean(), anyBoolean())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean())).thenReturn(op);
AbfsOutputStream out = new AbfsOutputStream(client, path, 0, bufferSize, true, false, true, false);
final byte[] b = new byte[bufferSize];
new Random().nextBytes(b);
for (int i = 0; i < 2; i++) {
out.write(b);
}
out.hflush();
ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
ArgumentCaptor<Boolean> acFlush = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<Boolean> acClose = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
acFlush.capture(), acClose.capture());
assertThat(Arrays.asList(path, path)).describedAs("File Path").isEqualTo(acString.getAllValues());
assertThat(new HashSet<Long>(Arrays.asList(Long.valueOf(0), Long.valueOf(bufferSize)))).describedAs("File Position").isEqualTo(
new HashSet<Long>(acLong.getAllValues()));
//flush=false, close=false, flush=false, close=false
assertThat(Arrays.asList(false, false)).describedAs("Flush Flag").isEqualTo(acFlush.getAllValues());
assertThat(Arrays.asList(false, false)).describedAs("Close Flag").isEqualTo(acClose.getAllValues());
assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
assertThat(Arrays.asList(bufferSize, bufferSize)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues());
ArgumentCaptor<String> acFlushString = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Long> acFlushLong = ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<Boolean> acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<Boolean> acFlushClose = ArgumentCaptor.forClass(Boolean.class);
verify(client, times(1)).flush(acFlushString.capture(), acFlushLong.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture());
assertThat(Arrays.asList(path)).describedAs("path").isEqualTo(acFlushString.getAllValues());
assertThat(Arrays.asList(Long.valueOf(2*bufferSize))).describedAs("position").isEqualTo(acFlushLong.getAllValues());
assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
assertThat(Arrays.asList(false)).describedAs("Close flag").isEqualTo(acFlushClose.getAllValues());
}
/**
* The test verifies OutputStream Write of bufferSize(4KB) followed by a flush call is making correct HTTP calls to the server
*/
@Test
public void verifyWriteRequestOfBufferSizeAndFlush() throws Exception {
AbfsClient client = mock(AbfsClient.class);
AbfsRestOperation op = mock(AbfsRestOperation.class);
AbfsConfiguration abfsConf;
final Configuration conf = new Configuration();
conf.set(accountKey1, accountValue1);
abfsConf = new AbfsConfiguration(conf, accountName1);
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), anyBoolean(), anyBoolean())).thenReturn(op);
AbfsOutputStream out = new AbfsOutputStream(client, path, 0, bufferSize, true, false, true, false);
final byte[] b = new byte[bufferSize];
new Random().nextBytes(b);
for (int i = 0; i < 2; i++) {
out.write(b);
}
out.flush();
Thread.sleep(1000);
ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
ArgumentCaptor<Boolean> acFlush = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<Boolean> acClose = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
acFlush.capture(), acClose.capture());
assertThat(Arrays.asList(path, path)).describedAs("path").isEqualTo(acString.getAllValues());
assertThat(new HashSet<Long>(Arrays.asList(Long.valueOf(0), Long.valueOf(bufferSize)))).describedAs("Position").isEqualTo(
new HashSet<Long>(acLong.getAllValues()));
//flush=false, close=false, flush=false, close=false
assertThat(Arrays.asList(false, false)).describedAs("Flush = true/false").isEqualTo(acFlush.getAllValues());
assertThat(Arrays.asList(false, false)).describedAs("Close = true/false").isEqualTo(acClose.getAllValues());
assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
assertThat(Arrays.asList(bufferSize, bufferSize)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues());
}
}