From d20109c171460f3312a760c1309f95b2bf61e0d3 Mon Sep 17 00:00:00 2001 From: ishaniahuja <50942176+ishaniahuja@users.noreply.github.com> Date: Sun, 5 Jul 2020 01:55:14 +0530 Subject: [PATCH] HADOOP-17058. ABFS: Support for AppendBlob in Hadoop ABFS Driver - Contributed by Ishani Ahuja --- .../hadoop/fs/azurebfs/AbfsConfiguration.java | 8 + .../fs/azurebfs/AzureBlobFileSystemStore.java | 50 +- .../azurebfs/constants/AbfsHttpConstants.java | 1 + .../azurebfs/constants/ConfigurationKeys.java | 3 + .../constants/FileSystemConfigurations.java | 2 + .../azurebfs/constants/HttpQueryParams.java | 1 + .../fs/azurebfs/services/AbfsClient.java | 46 +- .../azurebfs/services/AbfsHttpOperation.java | 10 + .../azurebfs/services/AbfsOutputStream.java | 63 ++- .../services/AbfsOutputStreamContext.java | 12 + .../azurebfs/AbstractAbfsIntegrationTest.java | 4 + .../azurebfs/ITestAbfsNetworkStatistics.java | 52 ++- .../ITestAbfsOutputStreamStatistics.java | 4 + .../azurebfs/ITestAbfsReadWriteAndSeek.java | 3 + .../azurebfs/ITestAbfsStreamStatistics.java | 11 +- .../ITestAzureBlobFileSystemCreate.java | 20 +- .../azurebfs/ITestAzureBlobFileSystemE2E.java | 3 + .../ITestAzureBlobFileSystemFlush.java | 30 +- ...TestAbfsConfigurationFieldsValidation.java | 2 +- .../constants/TestConfigurationKeys.java | 1 + .../services/TestAbfsOutputStream.java | 430 ++++++++++++++++++ 21 files changed, 714 insertions(+), 42 deletions(-) create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index bbe2274addc..43021c0fa8b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -153,6 +153,10 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES) private String azureAtomicDirs; + @StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_APPEND_BLOB_KEY, + DefaultValue = DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES) + private String azureAppendBlobDirs; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, DefaultValue = DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION) private boolean createRemoteFileSystemDuringInitialization; @@ -544,6 +548,10 @@ public class AbfsConfiguration{ return this.azureAtomicDirs; } + public String getAppendBlobDirs() { + return this.azureAppendBlobDirs; + } + public boolean getCreateRemoteFileSystemDuringInitialization() { // we do not support creating the filesystem when AuthType is SAS return this.createRemoteFileSystemDuringInitialization diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index c310e29870a..74908dec1e5 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -62,6 +62,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes; +import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; @@ -145,6 +146,11 @@ public class AzureBlobFileSystemStore implements Closeable { private final IdentityTransformerInterface identityTransformer; private final AbfsPerfTracker abfsPerfTracker; + /** + * The set of directories where we should store files as append blobs. + */ + private Set appendBlobDirSet; + public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration configuration, AbfsCounters abfsCounters) throws IOException { @@ -196,6 +202,23 @@ public class AzureBlobFileSystemStore implements Closeable { throw new IOException(e); } LOG.trace("IdentityTransformer init complete"); + + // Extract the directories that should contain append blobs + String appendBlobDirs = abfsConfiguration.getAppendBlobDirs(); + if (appendBlobDirs.trim().isEmpty()) { + this.appendBlobDirSet = new HashSet(); + } else { + this.appendBlobDirSet = new HashSet<>(Arrays.asList( + abfsConfiguration.getAppendBlobDirs().split(AbfsHttpConstants.COMMA))); + } + } + + /** + * Checks if the given key in Azure Storage should be stored as a page + * blob instead of block blob. + */ + public boolean isAppendBlobKey(String key) { + return isKeyForDirectorySet(key, appendBlobDirSet); } /** @@ -431,10 +454,15 @@ public class AzureBlobFileSystemStore implements Closeable { isNamespaceEnabled); String relativePath = getRelativePath(path); + boolean isAppendBlob = false; + if (isAppendBlobKey(path.toString())) { + isAppendBlob = true; + } final AbfsRestOperation op = client.createPath(relativePath, true, overwrite, isNamespaceEnabled ? getOctalNotation(permission) : null, - isNamespaceEnabled ? getOctalNotation(umask) : null); + isNamespaceEnabled ? getOctalNotation(umask) : null, + isAppendBlob); perfInfo.registerResult(op.getResult()).registerSuccess(true); return new AbfsOutputStream( @@ -442,16 +470,21 @@ public class AzureBlobFileSystemStore implements Closeable { statistics, relativePath, 0, - populateAbfsOutputStreamContext()); + populateAbfsOutputStreamContext(isAppendBlob)); } } - private AbfsOutputStreamContext populateAbfsOutputStreamContext() { + private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppendBlob) { + int bufferSize = abfsConfiguration.getWriteBufferSize(); + if (isAppendBlob && bufferSize > FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE) { + bufferSize = FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE; + } return new AbfsOutputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds()) - .withWriteBufferSize(abfsConfiguration.getWriteBufferSize()) + .withWriteBufferSize(bufferSize) .enableFlush(abfsConfiguration.isFlushEnabled()) .disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled()) .withStreamStatistics(new AbfsOutputStreamStatisticsImpl()) + .withAppendBlob(isAppendBlob) .build(); } @@ -468,7 +501,7 @@ public class AzureBlobFileSystemStore implements Closeable { final AbfsRestOperation op = client.createPath(getRelativePath(path), false, true, isNamespaceEnabled ? getOctalNotation(permission) : null, - isNamespaceEnabled ? getOctalNotation(umask) : null); + isNamespaceEnabled ? getOctalNotation(umask) : null, false); perfInfo.registerResult(op.getResult()).registerSuccess(true); } } @@ -544,12 +577,17 @@ public class AzureBlobFileSystemStore implements Closeable { perfInfo.registerSuccess(true); + boolean isAppendBlob = false; + if (isAppendBlobKey(path.toString())) { + isAppendBlob = true; + } + return new AbfsOutputStream( client, statistics, relativePath, offset, - populateAbfsOutputStreamContext()); + populateAbfsOutputStreamContext(isAppendBlob)); } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java index 8d45513da58..38b79c9412f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java @@ -40,6 +40,7 @@ public final class AbfsHttpConstants { public static final String CHECK_ACCESS = "checkAccess"; public static final String GET_STATUS = "getStatus"; public static final String DEFAULT_TIMEOUT = "90"; + public static final String APPEND_BLOB_TYPE = "appendblob"; public static final String TOKEN_VERSION = "2"; public static final String JAVA_VENDOR = "java.vendor"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 67ce0f59167..b5feee64ab4 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -59,6 +59,9 @@ public final class ConfigurationKeys { public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling"; public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https"; public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key"; + /** Provides a config to provide comma separated path prefixes on which Appendblob based files are created + * Default is empty. **/ + public static final String FS_AZURE_APPEND_BLOB_KEY = "fs.azure.appendblob.directories"; public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth"; /** Provides a config control to enable or disable ABFS Flush operations - * HFlush and HSync. Default is true. **/ diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index c12631d96db..a367daf6ee5 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -47,6 +47,7 @@ public final class FileSystemConfigurations { // Default upload and download buffer size public static final int DEFAULT_WRITE_BUFFER_SIZE = 8 * ONE_MB; // 8 MB + public static final int APPENDBLOB_MAX_WRITE_BUFFER_SIZE = 4 * ONE_MB; // 4 MB public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB; // 4 MB public static final int MIN_BUFFER_SIZE = 16 * ONE_KB; // 16 KB public static final int MAX_BUFFER_SIZE = 100 * ONE_MB; // 100 MB @@ -61,6 +62,7 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = false; public static final String DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES = "/hbase"; + public static final String DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES = ""; public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1; public static final boolean DEFAULT_ENABLE_FLUSH = true; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java index 9f735f729cb..5a550ac783f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java @@ -38,6 +38,7 @@ public final class HttpQueryParams { public static final String QUERY_PARAM_RETAIN_UNCOMMITTED_DATA = "retainUncommittedData"; public static final String QUERY_PARAM_CLOSE = "close"; public static final String QUERY_PARAM_UPN = "upn"; + public static final String QUERY_PARAM_BLOBTYPE = "blobtype"; private HttpQueryParams() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index f614bbd41d2..f747bd068cc 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -272,7 +272,8 @@ public class AbfsClient implements Closeable { } public AbfsRestOperation createPath(final String path, final boolean isFile, final boolean overwrite, - final String permission, final String umask) throws AzureBlobFileSystemException { + final String permission, final String umask, + final boolean isAppendBlob) throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); if (!overwrite) { requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, AbfsHttpConstants.STAR)); @@ -288,6 +289,9 @@ public class AbfsClient implements Closeable { final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY); + if (isAppendBlob) { + abfsUriQueryBuilder.addQuery(QUERY_PARAM_BLOBTYPE, APPEND_BLOB_TYPE); + } String operation = isFile ? SASTokenProvider.CREATE_FILE_OPERATION @@ -380,7 +384,7 @@ public class AbfsClient implements Closeable { } public AbfsRestOperation append(final String path, final long position, final byte[] buffer, final int offset, - final int length, final String cachedSasToken) throws AzureBlobFileSystemException { + final int length, final String cachedSasToken, final boolean isAppendBlob) throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); // JDK7 does not support PATCH, so to workaround the issue we will use // PUT and specify the real method in the X-Http-Method-Override header. @@ -401,10 +405,46 @@ public class AbfsClient implements Closeable { HTTP_METHOD_PUT, url, requestHeaders, buffer, offset, length, sasTokenForReuse); - op.execute(); + try { + op.execute(); + } catch (AzureBlobFileSystemException e) { + if (isAppendBlob && appendSuccessCheckOp(op, path, (position + length))) { + final AbfsRestOperation successOp = new AbfsRestOperation( + AbfsRestOperationType.Append, + this, + HTTP_METHOD_PUT, + url, + requestHeaders, buffer, offset, length, sasTokenForReuse); + successOp.hardSetResult(HttpURLConnection.HTTP_OK); + return successOp; + } + throw e; + } + return op; } + // For AppendBlob its possible that the append succeeded in the backend but the request failed. + // However a retry would fail with an InvalidQueryParameterValue + // (as the current offset would be unacceptable). + // Hence, we pass/succeed the appendblob append call + // in case we are doing a retry after checking the length of the file + public boolean appendSuccessCheckOp(AbfsRestOperation op, final String path, + final long length) throws AzureBlobFileSystemException { + if ((op.isARetriedRequest()) + && (op.getResult().getStatusCode() == HttpURLConnection.HTTP_BAD_REQUEST)) { + final AbfsRestOperation destStatusOp = getPathStatus(path, false); + if (destStatusOp.getResult().getStatusCode() == HttpURLConnection.HTTP_OK) { + String fileLength = destStatusOp.getResult().getResponseHeader( + HttpHeaderConfigurations.CONTENT_LENGTH); + if (length <= Long.parseLong(fileLength)) { + return true; + } + } + } + return false; + } + public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData, boolean isClose, final String cachedSasToken) throws AzureBlobFileSystemException { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java index 5dc4a89a53c..a63c98261f1 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java @@ -69,6 +69,7 @@ public class AbfsHttpOperation implements AbfsPerfLoggable { private String storageErrorMessage = ""; private String clientRequestId = ""; private String requestId = ""; + private String expectedAppendPos = ""; private ListResultSchema listResultSchema = null; // metrics @@ -126,6 +127,10 @@ public class AbfsHttpOperation implements AbfsPerfLoggable { return clientRequestId; } + public String getExpectedAppendPos() { + return expectedAppendPos; + } + public String getRequestId() { return requestId; } @@ -154,6 +159,8 @@ public class AbfsHttpOperation implements AbfsPerfLoggable { sb.append(statusCode); sb.append(","); sb.append(storageErrorCode); + sb.append(","); + sb.append(expectedAppendPos); sb.append(",cid="); sb.append(clientRequestId); sb.append(",rid="); @@ -449,6 +456,9 @@ public class AbfsHttpOperation implements AbfsPerfLoggable { case "message": storageErrorMessage = fieldValue; break; + case "ExpectedAppendPos": + expectedAppendPos = fieldValue; + break; default: break; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 89afca42202..6c1e177da61 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -60,6 +60,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa private boolean closed; private boolean supportFlush; private boolean disableOutputStreamFlush; + private boolean isAppendBlob; private volatile IOException lastError; private long lastFlushOffset; @@ -106,6 +107,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa this.supportFlush = abfsOutputStreamContext.isEnableFlush(); this.disableOutputStreamFlush = abfsOutputStreamContext .isDisableOutputStreamFlush(); + this.isAppendBlob = abfsOutputStreamContext.isAppendBlob(); this.lastError = null; this.lastFlushOffset = 0; this.bufferSize = abfsOutputStreamContext.getWriteBufferSize(); @@ -114,8 +116,11 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa this.writeOperations = new ConcurrentLinkedDeque<>(); this.outputStreamStatistics = abfsOutputStreamContext.getStreamStatistics(); - this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); - + if (this.isAppendBlob) { + this.maxConcurrentRequestCount = 1; + } else { + this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); + } this.threadExecutor = new ThreadPoolExecutor(maxConcurrentRequestCount, maxConcurrentRequestCount, @@ -309,7 +314,50 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa flushWrittenBytesToServiceAsync(); } + private void writeAppendBlobCurrentBufferToService() throws IOException { + if (bufferIndex == 0) { + return; + } + outputStreamStatistics.writeCurrentBuffer(); + + final byte[] bytes = buffer; + final int bytesLength = bufferIndex; + outputStreamStatistics.bytesToUpload(bytesLength); + buffer = byteBufferPool.getBuffer(false, bufferSize).array(); + bufferIndex = 0; + final long offset = position; + position += bytesLength; + AbfsPerfTracker tracker = client.getAbfsPerfTracker(); + try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, + "writeCurrentBufferToService", "append")) { + AbfsRestOperation op = client.append(path, offset, bytes, 0, + bytesLength, cachedSasToken.get(), this.isAppendBlob); + cachedSasToken.update(op.getSasToken()); + outputStreamStatistics.uploadSuccessful(bytesLength); + perfInfo.registerResult(op.getResult()); + byteBufferPool.putBuffer(ByteBuffer.wrap(bytes)); + perfInfo.registerSuccess(true); + return; + } catch (Exception ex) { + if (ex instanceof AbfsRestOperationException) { + if (((AbfsRestOperationException) ex).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { + throw new FileNotFoundException(ex.getMessage()); + } + } + if (ex instanceof AzureBlobFileSystemException) { + ex = (AzureBlobFileSystemException) ex; + } + lastError = new IOException(ex); + throw lastError; + } + } + private synchronized void writeCurrentBufferToService() throws IOException { + if (this.isAppendBlob) { + writeAppendBlobCurrentBufferToService(); + return; + } + if (bufferIndex == 0) { return; } @@ -336,7 +384,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "writeCurrentBufferToService", "append")) { AbfsRestOperation op = client.append(path, offset, bytes, 0, - bytesLength, cachedSasToken.get()); + bytesLength, cachedSasToken.get(), false); cachedSasToken.update(op.getSasToken()); perfInfo.registerResult(op.getResult()); byteBufferPool.putBuffer(ByteBuffer.wrap(bytes)); @@ -389,6 +437,11 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa private synchronized void flushWrittenBytesToServiceInternal(final long offset, final boolean retainUncommitedData, final boolean isClose) throws IOException { + // flush is called for appendblob only on close + if (this.isAppendBlob && !isClose) { + return; + } + AbfsPerfTracker tracker = client.getAbfsPerfTracker(); try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "flushWrittenBytesToServiceInternal", "flush")) { @@ -434,6 +487,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa for (completed = false; completionService.poll() != null; completed = true) { // keep polling until there is no data } + // for AppendBLob, jobs are not submitted to completion service + if (isAppendBlob) { + completed = true; + } if (!completed) { try { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java index dcd6c459817..03e4abaf4f6 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java @@ -31,6 +31,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext { private AbfsOutputStreamStatistics streamStatistics; + private boolean isAppendBlob; + public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) { super(sasTokenRenewPeriodForStreamsInSeconds); } @@ -58,6 +60,12 @@ public class AbfsOutputStreamContext extends AbfsStreamContext { return this; } + public AbfsOutputStreamContext withAppendBlob( + final boolean isAppendBlob) { + this.isAppendBlob = isAppendBlob; + return this; + } + public AbfsOutputStreamContext build() { // Validation of parameters to be done here. return this; @@ -78,4 +86,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext { public AbfsOutputStreamStatistics getStreamStatistics() { return streamStatistics; } + + public boolean isAppendBlob() { + return isAppendBlob; + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java index a80bee65bf4..34b3615c1b5 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java @@ -122,6 +122,10 @@ public abstract class AbstractAbfsIntegrationTest extends this.testUrl = defaultUri.toString(); abfsConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultUri.toString()); abfsConfig.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, true); + if (abfsConfig.get(FS_AZURE_TEST_APPENDBLOB_ENABLED) == "true") { + String appendblobDirs = this.testUrl + "," + abfsConfig.get(FS_AZURE_CONTRACT_TEST_URI); + rawConfig.set(FS_AZURE_APPEND_BLOB_KEY, appendblobDirs); + } // For testing purposes, an IP address and port may be provided to override // the host specified in the FileSystem URI. Also note that the format of // the Azure Storage Service URI changes from diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java index 904fdf3f7c1..b2e13011521 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java @@ -81,10 +81,18 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest { * * bytes_sent : bytes wrote in AbfsOutputStream. */ - connectionsMade = assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE, - 6, metricMap); - requestsSent = assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS, 4, - metricMap); + if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())) { + // no network calls are made for hflush in case of appendblob + connectionsMade = assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE, + 5, metricMap); + requestsSent = assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS, 3, + metricMap); + } else { + connectionsMade = assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE, + 6, metricMap); + requestsSent = assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS, 4, + metricMap); + } bytesSent = assertAbfsStatistics(AbfsStatistic.BYTES_SENT, testNetworkStatsString.getBytes().length, metricMap); @@ -125,10 +133,18 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest { * wrote each time). * */ - assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE, - connectionsMade + 1 + LARGE_OPERATIONS * 2, metricMap); - assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS, - requestsSent + 1 + LARGE_OPERATIONS * 2, metricMap); + if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())) { + // no network calls are made for hflush in case of appendblob + assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE, + connectionsMade + 1 + LARGE_OPERATIONS, metricMap); + assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS, + requestsSent + 1 + LARGE_OPERATIONS, metricMap); + } else { + assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE, + connectionsMade + 1 + LARGE_OPERATIONS * 2, metricMap); + assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS, + requestsSent + 1 + LARGE_OPERATIONS * 2, metricMap); + } assertAbfsStatistics(AbfsStatistic.BYTES_SENT, bytesSent + LARGE_OPERATIONS * (testNetworkStatsString.getBytes().length), metricMap); @@ -183,8 +199,14 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest { * * bytes_received - This should be equal to bytes sent earlier. */ - getResponses = assertAbfsStatistics(AbfsStatistic.GET_RESPONSES, 8, - metricMap); + if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(getResponsePath).toString())) { + //for appendBlob hflush is a no-op + getResponses = assertAbfsStatistics(AbfsStatistic.GET_RESPONSES, 7, + metricMap); + } else { + getResponses = assertAbfsStatistics(AbfsStatistic.GET_RESPONSES, 8, + metricMap); + } // Testing that bytes received is equal to bytes sent. long bytesSend = metricMap.get(AbfsStatistic.BYTES_SENT.getStatName()); bytesReceived = assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED, @@ -242,8 +264,14 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest { assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED, bytesReceived + LARGE_OPERATIONS * (testResponseString.getBytes().length), metricMap); - assertAbfsStatistics(AbfsStatistic.GET_RESPONSES, - getResponses + 3 + 2 * LARGE_OPERATIONS, metricMap); + if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(getResponsePath).toString())) { + // no network calls are made for hflush in case of appendblob + assertAbfsStatistics(AbfsStatistic.GET_RESPONSES, + getResponses + 3 + LARGE_OPERATIONS, metricMap); + } else { + assertAbfsStatistics(AbfsStatistic.GET_RESPONSES, + getResponses + 3 + 2 * LARGE_OPERATIONS, metricMap); + } } finally { IOUtils.cleanupWithLogger(LOG, out, in); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java index 09cbfde1beb..c8640dded3d 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java @@ -113,6 +113,10 @@ public class ITestAbfsOutputStreamStatistics final AzureBlobFileSystem fs = getFileSystem(); Path queueShrinkFilePath = path(getMethodName()); String testQueueShrink = "testQueue"; + if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(queueShrinkFilePath).toString())) { + // writeOperationsQueue is not used for appendBlob, hence queueShrink is 0 + return; + } try (AbfsOutputStream outForOneOp = createAbfsOutputStreamWithFlushEnabled( fs, queueShrinkFilePath)) { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java index a270a00e913..52abb097ef3 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE; @@ -46,6 +47,7 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest { public static Iterable sizes() { return Arrays.asList(new Object[][]{{MIN_BUFFER_SIZE}, {DEFAULT_READ_BUFFER_SIZE}, + {APPENDBLOB_MAX_WRITE_BUFFER_SIZE}, {MAX_BUFFER_SIZE}}); } @@ -70,6 +72,7 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest { final byte[] b = new byte[2 * bufferSize]; new Random().nextBytes(b); + try (FSDataOutputStream stream = fs.create(TEST_PATH)) { stream.write(b); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStreamStatistics.java index 51531f678f6..395a456124b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStreamStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStreamStatistics.java @@ -136,8 +136,15 @@ public class ITestAbfsStreamStatistics extends AbstractAbfsIntegrationTest { testReadWriteOps.getBytes().length); } - //Test for 1000000 read operations - assertReadWriteOps("read", largeValue, statistics.getReadOps()); + if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(largeOperationsFile).toString())) { + // for appendblob data is already flushed, so there is more data to read. + assertTrue(String.format("The actual value of %d was not equal to the " + + "expected value", statistics.getReadOps()), + statistics.getReadOps() == (largeValue + 3) || statistics.getReadOps() == (largeValue + 4)); + } else { + //Test for 1000000 read operations + assertReadWriteOps("read", largeValue, statistics.getReadOps()); + } } finally { IOUtils.cleanupWithLogger(LOG, inForLargeOperations, diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java index 94368a4f369..4b8f071b998 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java @@ -145,15 +145,19 @@ public class ITestAzureBlobFileSystemCreate extends out.hsync(); fail("Expected a failure"); } catch (FileNotFoundException fnfe) { - // the exception raised in close() must be in the caught exception's - // suppressed list - Throwable[] suppressed = fnfe.getSuppressed(); - assertEquals("suppressed count", 1, suppressed.length); - Throwable inner = suppressed[0]; - if (!(inner instanceof IOException)) { - throw inner; + //appendblob outputStream does not generate suppressed exception on close as it is + //single threaded code + if (!fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(testPath).toString())) { + // the exception raised in close() must be in the caught exception's + // suppressed list + Throwable[] suppressed = fnfe.getSuppressed(); + assertEquals("suppressed count", 1, suppressed.length); + Throwable inner = suppressed[0]; + if (!(inner instanceof IOException)) { + throw inner; + } + GenericTestUtils.assertExceptionContains(fnfe.getMessage(), inner); } - GenericTestUtils.assertExceptionContains(fnfe.getMessage(), inner); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java index ebc9c07e53e..05c3855f5c8 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java @@ -203,6 +203,9 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest { public void testFlushWithFileNotFoundException() throws Exception { final AzureBlobFileSystem fs = getFileSystem(); final Path testFilePath = new Path(methodName.getMethodName()); + if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(testFilePath).toString())) { + return; + } FSDataOutputStream stream = fs.create(testFilePath); assertTrue(fs.exists(testFilePath)); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java index 60f7f7d23f0..92aa5520ee4 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java @@ -49,7 +49,8 @@ import org.apache.hadoop.fs.Path; public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest { private static final int BASE_SIZE = 1024; private static final int ONE_THOUSAND = 1000; - private static final int TEST_BUFFER_SIZE = 5 * ONE_THOUSAND * BASE_SIZE; + //3000 KB to support appenblob too + private static final int TEST_BUFFER_SIZE = 3 * ONE_THOUSAND * BASE_SIZE; private static final int ONE_MB = 1024 * 1024; private static final int FLUSH_TIMES = 200; private static final int THREAD_SLEEP_TIME = 1000; @@ -226,11 +227,15 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest { final Path testFilePath = path(methodName.getMethodName()); byte[] buffer = getRandomBytesArray(); - // The test case must write "fs.azure.write.request.size" bytes // to the stream in order for the data to be uploaded to storage. - assertEquals(fs.getAbfsStore().getAbfsConfiguration().getWriteBufferSize(), - buffer.length); + assertTrue(fs.getAbfsStore().getAbfsConfiguration().getWriteBufferSize() + <= buffer.length); + + boolean isAppendBlob = true; + if (!fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(testFilePath).toString())) { + isAppendBlob = false; + } try (FSDataOutputStream stream = fs.create(testFilePath)) { stream.write(buffer); @@ -245,7 +250,8 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest { // Verify that the data can be read if disableOutputStreamFlush is // false; and otherwise cannot be read. - validate(fs.open(testFilePath), buffer, !disableOutputStreamFlush); + /* For Appendlob flush is not needed to update data on server */ + validate(fs.open(testFilePath), buffer, !disableOutputStreamFlush || isAppendBlob); } } @@ -267,10 +273,15 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest { final AzureBlobFileSystem fs = this.getFileSystem(); byte[] buffer = getRandomBytesArray(); final Path testFilePath = path(methodName.getMethodName()); + boolean isAppendBlob = false; + if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(testFilePath).toString())) { + isAppendBlob = true; + } try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, false)) { stream.hflush(); - validate(fs, testFilePath, buffer, false); + /* For Appendlob flush is not needed to update data on server */ + validate(fs, testFilePath, buffer, isAppendBlob); } } @@ -322,9 +333,14 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest { final AzureBlobFileSystem fs = this.getFileSystem(); byte[] buffer = getRandomBytesArray(); final Path testFilePath = path(methodName.getMethodName()); + boolean isAppendBlob = false; + if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(testFilePath).toString())) { + isAppendBlob = true; + } try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, false)) { stream.hsync(); - validate(fs, testFilePath, buffer, false); + /* For Appendlob flush is not needed to update data on server */ + validate(fs, testFilePath, buffer, isAppendBlob); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsConfigurationFieldsValidation.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsConfigurationFieldsValidation.java index 45deb9ebeec..d8711876fef 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsConfigurationFieldsValidation.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsConfigurationFieldsValidation.java @@ -189,4 +189,4 @@ public class TestAbfsConfigurationFieldsValidation { abfsConfig.setMaxBackoffIntervalMilliseconds(backoffTime); return abfsConfig; } -} \ No newline at end of file +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java index 16a3f5703bb..72ea7661b5a 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java @@ -27,6 +27,7 @@ public final class TestConfigurationKeys { public static final String FS_AZURE_ACCOUNT_KEY = "fs.azure.account.key"; public static final String FS_AZURE_CONTRACT_TEST_URI = "fs.contract.test.fs.abfs"; public static final String FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT = "fs.azure.test.namespace.enabled"; + public static final String FS_AZURE_TEST_APPENDBLOB_ENABLED = "fs.azure.test.appendblob.enabled"; public static final String FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_ID = "fs.azure.account.oauth2.contributor.client.id"; public static final String FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_SECRET = "fs.azure.account.oauth2.contributor.client.secret"; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java new file mode 100644 index 00000000000..4105aa18f21 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java @@ -0,0 +1,430 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Random; + +import org.junit.Test; + +import org.mockito.ArgumentCaptor; + +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; +import org.apache.hadoop.conf.Configuration; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.anyBoolean; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.anyLong; + +import static org.assertj.core.api.Assertions.assertThat; + +public final class TestAbfsOutputStream { + + private static final int BUFFER_SIZE = 4096; + private static final int WRITE_SIZE = 1000; + private static final String PATH = "~/testpath"; + private final String globalKey = "fs.azure.configuration"; + private final String accountName1 = "account1"; + private final String accountKey1 = globalKey + "." + accountName1; + private final String accountValue1 = "one"; + + private AbfsOutputStreamContext populateAbfsOutputStreamContext(int writeBufferSize, + boolean isFlushEnabled, + boolean disableOutputStreamFlush, + boolean isAppendBlob) { + return new AbfsOutputStreamContext(2) + .withWriteBufferSize(writeBufferSize) + .enableFlush(isFlushEnabled) + .disableOutputStreamFlush(disableOutputStreamFlush) + .withStreamStatistics(new AbfsOutputStreamStatisticsImpl()) + .withAppendBlob(isAppendBlob) + .build(); + } + + /** + * The test verifies OutputStream shortwrite case(2000bytes write followed by flush, hflush, hsync) is making correct HTTP calls to the server + */ + @Test + public void verifyShortWriteRequest() throws Exception { + + AbfsClient client = mock(AbfsClient.class); + AbfsRestOperation op = mock(AbfsRestOperation.class); + AbfsConfiguration abfsConf; + final Configuration conf = new Configuration(); + conf.set(accountKey1, accountValue1); + abfsConf = new AbfsConfiguration(conf, accountName1); + AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); + when(client.getAbfsPerfTracker()).thenReturn(tracker); + when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op); + when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op); + + AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false)); + final byte[] b = new byte[WRITE_SIZE]; + new Random().nextBytes(b); + out.write(b); + out.hsync(); + ArgumentCaptor acString = ArgumentCaptor.forClass(String.class); + ArgumentCaptor acLong = ArgumentCaptor.forClass(Long.class); + ArgumentCaptor acBufferOffset = ArgumentCaptor.forClass(Integer.class); + ArgumentCaptor acBufferLength = ArgumentCaptor.forClass(Integer.class); + ArgumentCaptor acByteArray = ArgumentCaptor.forClass(byte[].class); + ArgumentCaptor acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class); + ArgumentCaptor acSASToken = ArgumentCaptor.forClass(String.class); + + + final byte[] b1 = new byte[2*WRITE_SIZE]; + new Random().nextBytes(b1); + out.write(b1); + out.flush(); + out.hflush(); + + out.hsync(); + + verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(), + acSASToken.capture(), acAppendBlobAppend.capture()); + assertThat(Arrays.asList(PATH, PATH)).describedAs("Path of the requests").isEqualTo(acString.getAllValues()); + assertThat(Arrays.asList(Long.valueOf(0), Long.valueOf(WRITE_SIZE))).describedAs("Write Position").isEqualTo(acLong.getAllValues()); + assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues()); + assertThat(Arrays.asList(WRITE_SIZE, 2*WRITE_SIZE)).describedAs("Buffer length").isEqualTo(acBufferLength.getAllValues()); + + } + + /** + * The test verifies OutputStream Write of WRITE_SIZE(1000 bytes) followed by a close is making correct HTTP calls to the server + */ + @Test + public void verifyWriteRequest() throws Exception { + + AbfsClient client = mock(AbfsClient.class); + AbfsRestOperation op = mock(AbfsRestOperation.class); + AbfsConfiguration abfsConf; + final Configuration conf = new Configuration(); + conf.set(accountKey1, accountValue1); + abfsConf = new AbfsConfiguration(conf, accountName1); + AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); + + when(client.getAbfsPerfTracker()).thenReturn(tracker); + when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op); + when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op); + + AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false)); + final byte[] b = new byte[WRITE_SIZE]; + new Random().nextBytes(b); + + for (int i = 0; i < 5; i++) { + out.write(b); + } + out.close(); + + ArgumentCaptor acString = ArgumentCaptor.forClass(String.class); + ArgumentCaptor acLong = ArgumentCaptor.forClass(Long.class); + ArgumentCaptor acBufferOffset = ArgumentCaptor.forClass(Integer.class); + ArgumentCaptor acBufferLength = ArgumentCaptor.forClass(Integer.class); + ArgumentCaptor acByteArray = ArgumentCaptor.forClass(byte[].class); + ArgumentCaptor acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class); + ArgumentCaptor acSASToken = ArgumentCaptor.forClass(String.class); + + verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(), + acSASToken.capture(), acAppendBlobAppend.capture()); + assertThat(Arrays.asList(PATH, PATH)).describedAs("Path").isEqualTo(acString.getAllValues()); + assertThat(new HashSet(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("Position").isEqualTo(new HashSet( + acLong.getAllValues())); + assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues()); + assertThat(new HashSet(Arrays.asList(BUFFER_SIZE, 5*WRITE_SIZE-BUFFER_SIZE))).describedAs("Buffer Length").isEqualTo(new HashSet( + acBufferLength.getAllValues())); + + ArgumentCaptor acFlushString = ArgumentCaptor.forClass(String.class); + ArgumentCaptor acFlushLong = ArgumentCaptor.forClass(Long.class); + ArgumentCaptor acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class); + ArgumentCaptor acFlushClose = ArgumentCaptor.forClass(Boolean.class); + ArgumentCaptor acFlushSASToken = ArgumentCaptor.forClass(String.class); + + verify(client, times(1)).flush(acFlushString.capture(), acFlushLong.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(), + acFlushSASToken.capture()); + assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushString.getAllValues()); + assertThat(Arrays.asList(Long.valueOf(5*WRITE_SIZE))).describedAs("position").isEqualTo(acFlushLong.getAllValues()); + assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues()); + assertThat(Arrays.asList(true)).describedAs("Close flag").isEqualTo(acFlushClose.getAllValues()); + } + + /** + * The test verifies OutputStream Write of BUFFER_SIZE(4KB) followed by a close is making correct HTTP calls to the server + */ + @Test + public void verifyWriteRequestOfBufferSizeAndClose() throws Exception { + + AbfsClient client = mock(AbfsClient.class); + AbfsRestOperation op = mock(AbfsRestOperation.class); + AbfsHttpOperation httpOp = mock(AbfsHttpOperation.class); + AbfsConfiguration abfsConf; + final Configuration conf = new Configuration(); + conf.set(accountKey1, accountValue1); + abfsConf = new AbfsConfiguration(conf, accountName1); + AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); + + when(client.getAbfsPerfTracker()).thenReturn(tracker); + when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op); + when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op); + when(op.getSasToken()).thenReturn("testToken"); + when(op.getResult()).thenReturn(httpOp); + + AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false)); + final byte[] b = new byte[BUFFER_SIZE]; + new Random().nextBytes(b); + + for (int i = 0; i < 2; i++) { + out.write(b); + } + out.close(); + + ArgumentCaptor acString = ArgumentCaptor.forClass(String.class); + ArgumentCaptor acLong = ArgumentCaptor.forClass(Long.class); + ArgumentCaptor acBufferOffset = ArgumentCaptor.forClass(Integer.class); + ArgumentCaptor acBufferLength = ArgumentCaptor.forClass(Integer.class); + ArgumentCaptor acByteArray = ArgumentCaptor.forClass(byte[].class); + ArgumentCaptor acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class); + ArgumentCaptor acSASToken = ArgumentCaptor.forClass(String.class); + + verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(), + acSASToken.capture(), acAppendBlobAppend.capture()); + assertThat(Arrays.asList(PATH, PATH)).describedAs("path").isEqualTo(acString.getAllValues()); + assertThat(new HashSet(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("Position").isEqualTo(new HashSet( + acLong.getAllValues())); + assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues()); + assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues()); + + ArgumentCaptor acFlushString = ArgumentCaptor.forClass(String.class); + ArgumentCaptor acFlushLong = ArgumentCaptor.forClass(Long.class); + ArgumentCaptor acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class); + ArgumentCaptor acFlushClose = ArgumentCaptor.forClass(Boolean.class); + ArgumentCaptor acFlushSASToken = ArgumentCaptor.forClass(String.class); + + verify(client, times(1)).flush(acFlushString.capture(), acFlushLong.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(), + acFlushSASToken.capture()); + assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushString.getAllValues()); + assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushLong.getAllValues()); + assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues()); + assertThat(Arrays.asList(true)).describedAs("Close flag").isEqualTo(acFlushClose.getAllValues()); + + } + + /** + * The test verifies OutputStream Write of BUFFER_SIZE(4KB) is making correct HTTP calls to the server + */ + @Test + public void verifyWriteRequestOfBufferSize() throws Exception { + + AbfsClient client = mock(AbfsClient.class); + AbfsRestOperation op = mock(AbfsRestOperation.class); + AbfsHttpOperation httpOp = mock(AbfsHttpOperation.class); + AbfsConfiguration abfsConf; + final Configuration conf = new Configuration(); + conf.set(accountKey1, accountValue1); + abfsConf = new AbfsConfiguration(conf, accountName1); + AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); + + when(client.getAbfsPerfTracker()).thenReturn(tracker); + when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op); + when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op); + when(op.getSasToken()).thenReturn("testToken"); + when(op.getResult()).thenReturn(httpOp); + + AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false)); + final byte[] b = new byte[BUFFER_SIZE]; + new Random().nextBytes(b); + + for (int i = 0; i < 2; i++) { + out.write(b); + } + Thread.sleep(1000); + + ArgumentCaptor acString = ArgumentCaptor.forClass(String.class); + ArgumentCaptor acLong = ArgumentCaptor.forClass(Long.class); + ArgumentCaptor acBufferOffset = ArgumentCaptor.forClass(Integer.class); + ArgumentCaptor acBufferLength = ArgumentCaptor.forClass(Integer.class); + ArgumentCaptor acByteArray = ArgumentCaptor.forClass(byte[].class); + ArgumentCaptor acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class); + ArgumentCaptor acSASToken = ArgumentCaptor.forClass(String.class); + + verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(), + acSASToken.capture(), acAppendBlobAppend.capture()); + assertThat(Arrays.asList(PATH, PATH)).describedAs("File Path").isEqualTo(acString.getAllValues()); + assertThat(new HashSet(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("Position in file").isEqualTo( + new HashSet(acLong.getAllValues())); + assertThat(Arrays.asList(0, 0)).describedAs("buffer offset").isEqualTo(acBufferOffset.getAllValues()); + assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("buffer length").isEqualTo(acBufferLength.getAllValues()); + + } + + /** + * The test verifies OutputStream Write of BUFFER_SIZE(4KB) on a AppendBlob based stream is making correct HTTP calls to the server + */ + @Test + public void verifyWriteRequestOfBufferSizeWithAppendBlob() throws Exception { + + AbfsClient client = mock(AbfsClient.class); + AbfsRestOperation op = mock(AbfsRestOperation.class); + AbfsConfiguration abfsConf; + final Configuration conf = new Configuration(); + conf.set(accountKey1, accountValue1); + abfsConf = new AbfsConfiguration(conf, accountName1); + AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); + + when(client.getAbfsPerfTracker()).thenReturn(tracker); + when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op); + when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op); + + AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, true)); + final byte[] b = new byte[BUFFER_SIZE]; + new Random().nextBytes(b); + + for (int i = 0; i < 2; i++) { + out.write(b); + } + Thread.sleep(1000); + + ArgumentCaptor acString = ArgumentCaptor.forClass(String.class); + ArgumentCaptor acLong = ArgumentCaptor.forClass(Long.class); + ArgumentCaptor acBufferOffset = ArgumentCaptor.forClass(Integer.class); + ArgumentCaptor acBufferLength = ArgumentCaptor.forClass(Integer.class); + ArgumentCaptor acByteArray = ArgumentCaptor.forClass(byte[].class); + ArgumentCaptor acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class); + ArgumentCaptor acSASToken = ArgumentCaptor.forClass(String.class); + + verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(), + acSASToken.capture(), acAppendBlobAppend.capture()); + assertThat(Arrays.asList(PATH, PATH)).describedAs("File Path").isEqualTo(acString.getAllValues()); + assertThat(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE))).describedAs("File Position").isEqualTo(acLong.getAllValues()); + assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues()); + assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues()); + assertThat(Arrays.asList(true, true)).describedAs("is AppendBlob Append").isEqualTo(acAppendBlobAppend.getAllValues()); + + } + + /** + * The test verifies OutputStream Write of BUFFER_SIZE(4KB) followed by a hflush call is making correct HTTP calls to the server + */ + @Test + public void verifyWriteRequestOfBufferSizeAndHFlush() throws Exception { + + AbfsClient client = mock(AbfsClient.class); + AbfsRestOperation op = mock(AbfsRestOperation.class); + AbfsConfiguration abfsConf; + final Configuration conf = new Configuration(); + conf.set(accountKey1, accountValue1); + abfsConf = new AbfsConfiguration(conf, accountName1); + AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); + + when(client.getAbfsPerfTracker()).thenReturn(tracker); + when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op); + when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op); + + AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false)); + final byte[] b = new byte[BUFFER_SIZE]; + new Random().nextBytes(b); + + for (int i = 0; i < 2; i++) { + out.write(b); + } + out.hflush(); + + ArgumentCaptor acString = ArgumentCaptor.forClass(String.class); + ArgumentCaptor acLong = ArgumentCaptor.forClass(Long.class); + ArgumentCaptor acBufferOffset = ArgumentCaptor.forClass(Integer.class); + ArgumentCaptor acBufferLength = ArgumentCaptor.forClass(Integer.class); + ArgumentCaptor acByteArray = ArgumentCaptor.forClass(byte[].class); + ArgumentCaptor acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class); + ArgumentCaptor acSASToken = ArgumentCaptor.forClass(String.class); + + verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(), + acSASToken.capture(), acAppendBlobAppend.capture()); + assertThat(Arrays.asList(PATH, PATH)).describedAs("File Path").isEqualTo(acString.getAllValues()); + assertThat(new HashSet(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("File Position").isEqualTo( + new HashSet(acLong.getAllValues())); + assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues()); + assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues()); + + ArgumentCaptor acFlushString = ArgumentCaptor.forClass(String.class); + ArgumentCaptor acFlushLong = ArgumentCaptor.forClass(Long.class); + ArgumentCaptor acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class); + ArgumentCaptor acFlushClose = ArgumentCaptor.forClass(Boolean.class); + ArgumentCaptor acFlushSASToken = ArgumentCaptor.forClass(String.class); + + verify(client, times(1)).flush(acFlushString.capture(), acFlushLong.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(), + acFlushSASToken.capture()); + assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushString.getAllValues()); + assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushLong.getAllValues()); + assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues()); + assertThat(Arrays.asList(false)).describedAs("Close flag").isEqualTo(acFlushClose.getAllValues()); + + } + + /** + * The test verifies OutputStream Write of BUFFER_SIZE(4KB) followed by a flush call is making correct HTTP calls to the server + */ + @Test + public void verifyWriteRequestOfBufferSizeAndFlush() throws Exception { + + AbfsClient client = mock(AbfsClient.class); + AbfsRestOperation op = mock(AbfsRestOperation.class); + AbfsConfiguration abfsConf; + final Configuration conf = new Configuration(); + conf.set(accountKey1, accountValue1); + abfsConf = new AbfsConfiguration(conf, accountName1); + AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); + when(client.getAbfsPerfTracker()).thenReturn(tracker); + when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op); + when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op); + + AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false)); + final byte[] b = new byte[BUFFER_SIZE]; + new Random().nextBytes(b); + + for (int i = 0; i < 2; i++) { + out.write(b); + } + Thread.sleep(1000); + out.flush(); + Thread.sleep(1000); + + ArgumentCaptor acString = ArgumentCaptor.forClass(String.class); + ArgumentCaptor acLong = ArgumentCaptor.forClass(Long.class); + ArgumentCaptor acBufferOffset = ArgumentCaptor.forClass(Integer.class); + ArgumentCaptor acBufferLength = ArgumentCaptor.forClass(Integer.class); + ArgumentCaptor acByteArray = ArgumentCaptor.forClass(byte[].class); + ArgumentCaptor acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class); + ArgumentCaptor acSASToken = ArgumentCaptor.forClass(String.class); + + verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(), + acSASToken.capture(), acAppendBlobAppend.capture()); + assertThat(Arrays.asList(PATH, PATH)).describedAs("path").isEqualTo(acString.getAllValues()); + assertThat(new HashSet(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("Position").isEqualTo( + new HashSet(acLong.getAllValues())); + assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues()); + assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues()); + + } +}