From 4865589bb4414c87d9ac02b4323ebbd485348cf7 Mon Sep 17 00:00:00 2001 From: Sneha Vijayarajan Date: Thu, 7 Jan 2021 00:13:37 +0530 Subject: [PATCH] HADOOP-17404. ABFS: Small write - Merge append and flush - Contributed by Sneha Vijayarajan (cherry picked from commit b612c310c26394aa406c99d8598c9cb7621df052) --- hadoop-tools/hadoop-azure/pom.xml | 2 + .../src/config/checkstyle-suppressions.xml | 2 + .../hadoop/fs/azurebfs/AbfsConfiguration.java | 8 + .../fs/azurebfs/AzureBlobFileSystemStore.java | 1 + .../azurebfs/constants/AbfsHttpConstants.java | 1 + .../azurebfs/constants/ConfigurationKeys.java | 9 + .../constants/FileSystemConfigurations.java | 1 + .../azurebfs/constants/HttpQueryParams.java | 1 + .../services/AppendRequestParameters.java | 69 +++ .../fs/azurebfs/services/AbfsClient.java | 47 +- .../azurebfs/services/AbfsOutputStream.java | 67 ++- .../services/AbfsOutputStreamContext.java | 11 + .../azurebfs/services/AbfsRestOperation.java | 2 +- .../azurebfs/ITestAbfsNetworkStatistics.java | 337 ++++++----- .../azurebfs/ITestSmallWriteOptimization.java | 523 ++++++++++++++++++ .../services/ITestAbfsOutputStream.java | 17 +- .../services/TestAbfsOutputStream.java | 259 ++++----- 17 files changed, 1019 insertions(+), 338 deletions(-) create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestSmallWriteOptimization.java diff --git a/hadoop-tools/hadoop-azure/pom.xml b/hadoop-tools/hadoop-azure/pom.xml index 7bfce0102fd..6b0599daa90 100644 --- a/hadoop-tools/hadoop-azure/pom.xml +++ b/hadoop-tools/hadoop-azure/pom.xml @@ -555,6 +555,7 @@ **/azurebfs/ITestAbfsReadWriteAndSeek.java **/azurebfs/ITestAzureBlobFileSystemListStatus.java **/azurebfs/extensions/ITestAbfsDelegationTokens.java + **/azurebfs/ITestSmallWriteOptimization.java @@ -594,6 +595,7 @@ **/azurebfs/ITestAbfsReadWriteAndSeek.java **/azurebfs/ITestAzureBlobFileSystemListStatus.java **/azurebfs/extensions/ITestAbfsDelegationTokens.java + **/azurebfs/ITestSmallWriteOptimization.java diff --git a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml index c50236162d8..070c8c1fe82 100644 --- a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml +++ b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml @@ -46,4 +46,6 @@ files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]AzureBlobFileSystemStore.java"/> + diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index b1c95d2e82b..5a703233953 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -100,6 +100,10 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_WRITE_BUFFER_SIZE) private int writeBufferSize; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION, + DefaultValue = DEFAULT_AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION) + private boolean enableSmallWriteOptimization; + @BooleanConfigurationValidatorAnnotation( ConfigurationKey = AZURE_READ_SMALL_FILES_COMPLETELY, DefaultValue = DEFAULT_READ_SMALL_FILES_COMPLETELY) @@ -537,6 +541,10 @@ public class AbfsConfiguration{ return this.writeBufferSize; } + public boolean isSmallWriteOptimizationEnabled() { + return this.enableSmallWriteOptimization; + } + public boolean readSmallFilesCompletely() { return this.readSmallFilesCompletely; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 869a6f9907f..c8dd518b4f3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -578,6 +578,7 @@ public class AzureBlobFileSystemStore implements Closeable { return new AbfsOutputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds()) .withWriteBufferSize(bufferSize) .enableFlush(abfsConfiguration.isFlushEnabled()) + .enableSmallWriteOptimization(abfsConfiguration.isSmallWriteOptimizationEnabled()) .disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled()) .withStreamStatistics(new AbfsOutputStreamStatisticsImpl()) .withAppendBlob(isAppendBlob) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java index 38b79c9412f..184657e7d66 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java @@ -76,6 +76,7 @@ public final class AbfsHttpConstants { public static final String AT = "@"; public static final String HTTP_HEADER_PREFIX = "x-ms-"; public static final String HASH = "#"; + public static final String TRUE = "true"; public static final String PLUS_ENCODE = "%20"; public static final String FORWARD_SLASH_ENCODE = "%2F"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 3e1ff80e7ef..cdef9c9b7ac 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -55,6 +55,15 @@ public final class ConfigurationKeys { public static final String AZURE_WRITE_MAX_CONCURRENT_REQUESTS = "fs.azure.write.max.concurrent.requests"; public static final String AZURE_WRITE_MAX_REQUESTS_TO_QUEUE = "fs.azure.write.max.requests.to.queue"; public static final String AZURE_WRITE_BUFFER_SIZE = "fs.azure.write.request.size"; + /** If the data size written by Hadoop app is small, i.e. data size : + * (a) before any of HFlush/HSync call is made or + * (b) between 2 HFlush/Hsync API calls + * is less than write buffer size, 2 separate calls, one for append and + * another for flush are made. + * By enabling the small write optimization, a single call will be made to + * perform both append and flush operations and hence reduce request count. + */ + public static final String AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION = "fs.azure.write.enableappendwithflush"; public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size"; public static final String AZURE_READ_SMALL_FILES_COMPLETELY = "fs.azure.read.smallfilescompletely"; public static final String AZURE_READ_OPTIMIZE_FOOTER_READ = "fs.azure.read.optimizefooterread"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 80082063f6e..a23dfd5292b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -56,6 +56,7 @@ public final class FileSystemConfigurations { // Default upload and download buffer size public static final int DEFAULT_WRITE_BUFFER_SIZE = 8 * ONE_MB; // 8 MB public static final int APPENDBLOB_MAX_WRITE_BUFFER_SIZE = 4 * ONE_MB; // 4 MB + public static final boolean DEFAULT_AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION = false; public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB; // 4 MB public static final boolean DEFAULT_READ_SMALL_FILES_COMPLETELY = false; public static final boolean DEFAULT_OPTIMIZE_FOOTER_READ = false; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java index 5a550ac783f..8a4ca90f358 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java @@ -36,6 +36,7 @@ public final class HttpQueryParams { public static final String QUERY_PARAM_POSITION = "position"; public static final String QUERY_PARAM_TIMEOUT = "timeout"; public static final String QUERY_PARAM_RETAIN_UNCOMMITTED_DATA = "retainUncommittedData"; + public static final String QUERY_PARAM_FLUSH = "flush"; public static final String QUERY_PARAM_CLOSE = "close"; public static final String QUERY_PARAM_UPN = "upn"; public static final String QUERY_PARAM_BLOBTYPE = "blobtype"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java new file mode 100644 index 00000000000..fb4d29f8794 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.contracts.services; + +/** + * Saves the different request parameters for append + */ +public class AppendRequestParameters { + public enum Mode { + APPEND_MODE, + FLUSH_MODE, + FLUSH_CLOSE_MODE + } + + private final long position; + private final int offset; + private final int length; + private final Mode mode; + private final boolean isAppendBlob; + + public AppendRequestParameters(final long position, + final int offset, + final int length, + final Mode mode, + final boolean isAppendBlob) { + this.position = position; + this.offset = offset; + this.length = length; + this.mode = mode; + this.isAppendBlob = isAppendBlob; + } + + public long getPosition() { + return this.position; + } + + public int getoffset() { + return this.offset; + } + + public int getLength() { + return this.length; + } + + public Mode getMode() { + return this.mode; + } + + public boolean isAppendBlob() { + return this.isAppendBlob; + } + +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index db2f44f3bb4..bfc11a676ae 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -45,6 +45,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderExcept import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper; import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider; import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; +import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters; import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider; import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils; import org.apache.hadoop.io.IOUtils; @@ -396,17 +397,27 @@ public class AbfsClient implements Closeable { return op; } - public AbfsRestOperation append(final String path, final long position, final byte[] buffer, final int offset, - final int length, final String cachedSasToken, final boolean isAppendBlob) throws AzureBlobFileSystemException { + public AbfsRestOperation append(final String path, final byte[] buffer, + AppendRequestParameters reqParams, final String cachedSasToken) + throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); // JDK7 does not support PATCH, so to workaround the issue we will use // PUT and specify the real method in the X-Http-Method-Override header. requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, - HTTP_METHOD_PATCH)); + HTTP_METHOD_PATCH)); final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION); - abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position)); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(reqParams.getPosition())); + + if ((reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_MODE) || ( + reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_CLOSE_MODE)) { + abfsUriQueryBuilder.addQuery(QUERY_PARAM_FLUSH, TRUE); + if (reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_CLOSE_MODE) { + abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, TRUE); + } + } + // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION, abfsUriQueryBuilder, cachedSasToken); @@ -414,20 +425,30 @@ public class AbfsClient implements Closeable { final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( AbfsRestOperationType.Append, - this, - HTTP_METHOD_PUT, - url, - requestHeaders, buffer, offset, length, sasTokenForReuse); + this, + HTTP_METHOD_PUT, + url, + requestHeaders, + buffer, + reqParams.getoffset(), + reqParams.getLength(), + sasTokenForReuse); try { op.execute(); } catch (AzureBlobFileSystemException e) { - if (isAppendBlob && appendSuccessCheckOp(op, path, (position + length))) { + if (reqParams.isAppendBlob() + && appendSuccessCheckOp(op, path, + (reqParams.getPosition() + reqParams.getLength()))) { final AbfsRestOperation successOp = new AbfsRestOperation( AbfsRestOperationType.Append, - this, - HTTP_METHOD_PUT, - url, - requestHeaders, buffer, offset, length, sasTokenForReuse); + this, + HTTP_METHOD_PUT, + url, + requestHeaders, + buffer, + reqParams.getoffset(), + reqParams.getLength(), + sasTokenForReuse); successOp.hardSetResult(HttpURLConnection.HTTP_OK); return successOp; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 01b2fa5dede..402fdda7b25 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -35,11 +35,13 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters; import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken; import org.apache.hadoop.io.ElasticByteBufferPool; import org.apache.hadoop.fs.FileSystem.Statistics; @@ -48,6 +50,9 @@ import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.Syncable; import static org.apache.hadoop.io.IOUtils.wrapException; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.FLUSH_CLOSE_MODE; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.FLUSH_MODE; /** * The BlobFsOutputStream for Rest AbfsClient. @@ -60,6 +65,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa private boolean closed; private boolean supportFlush; private boolean disableOutputStreamFlush; + private boolean enableSmallWriteOptimization; private boolean isAppendBlob; private volatile IOException lastError; @@ -69,6 +75,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa private final int bufferSize; private byte[] buffer; private int bufferIndex; + private int numOfAppendsToServerSinceLastFlush; private final int maxConcurrentRequestCount; private final int maxRequestsThatCanBeQueued; @@ -108,12 +115,15 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa this.supportFlush = abfsOutputStreamContext.isEnableFlush(); this.disableOutputStreamFlush = abfsOutputStreamContext .isDisableOutputStreamFlush(); + this.enableSmallWriteOptimization + = abfsOutputStreamContext.isEnableSmallWriteOptimization(); this.isAppendBlob = abfsOutputStreamContext.isAppendBlob(); this.lastError = null; this.lastFlushOffset = 0; this.bufferSize = abfsOutputStreamContext.getWriteBufferSize(); this.buffer = byteBufferPool.getBuffer(false, bufferSize).array(); this.bufferIndex = 0; + this.numOfAppendsToServerSinceLastFlush = 0; this.writeOperations = new ConcurrentLinkedDeque<>(); this.outputStreamStatistics = abfsOutputStreamContext.getStreamStatistics(); @@ -309,8 +319,29 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa private synchronized void flushInternal(boolean isClose) throws IOException { maybeThrowLastError(); + + // if its a flush post write < buffersize, send flush parameter in append + if (!isAppendBlob + && enableSmallWriteOptimization + && (numOfAppendsToServerSinceLastFlush == 0) // there are no ongoing store writes + && (writeOperations.size() == 0) // double checking no appends in progress + && (bufferIndex > 0)) { // there is some data that is pending to be written + smallWriteOptimizedflushInternal(isClose); + return; + } + writeCurrentBufferToService(); flushWrittenBytesToService(isClose); + numOfAppendsToServerSinceLastFlush = 0; + } + + private synchronized void smallWriteOptimizedflushInternal(boolean isClose) throws IOException { + // writeCurrentBufferToService will increment numOfAppendsToServerSinceLastFlush + writeCurrentBufferToService(true, isClose); + waitForAppendsToComplete(); + shrinkWriteOperationQueue(); + maybeThrowLastError(); + numOfAppendsToServerSinceLastFlush = 0; } private synchronized void flushInternalAsync() throws IOException { @@ -335,8 +366,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa AbfsPerfTracker tracker = client.getAbfsPerfTracker(); try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "writeCurrentBufferToService", "append")) { - AbfsRestOperation op = client.append(path, offset, bytes, 0, - bytesLength, cachedSasToken.get(), this.isAppendBlob); + AppendRequestParameters reqParams = new AppendRequestParameters(offset, 0, + bytesLength, APPEND_MODE, true); + AbfsRestOperation op = client.append(path, bytes, reqParams, cachedSasToken.get()); cachedSasToken.update(op.getSasToken()); outputStreamStatistics.uploadSuccessful(bytesLength); perfInfo.registerResult(op.getResult()); @@ -358,6 +390,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa } private synchronized void writeCurrentBufferToService() throws IOException { + writeCurrentBufferToService(false, false); + } + + private synchronized void writeCurrentBufferToService(boolean isFlush, boolean isClose) throws IOException { if (this.isAppendBlob) { writeAppendBlobCurrentBufferToService(); return; @@ -367,6 +403,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa return; } outputStreamStatistics.writeCurrentBuffer(); + numOfAppendsToServerSinceLastFlush++; final byte[] bytes = buffer; final int bytesLength = bufferIndex; @@ -388,8 +425,19 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa AbfsPerfTracker tracker = client.getAbfsPerfTracker(); try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "writeCurrentBufferToService", "append")) { - AbfsRestOperation op = client.append(path, offset, bytes, 0, - bytesLength, cachedSasToken.get(), false); + AppendRequestParameters.Mode + mode = APPEND_MODE; + if (isFlush & isClose) { + mode = FLUSH_CLOSE_MODE; + } else if (isFlush) { + mode = FLUSH_MODE; + } + + AppendRequestParameters reqParams = new AppendRequestParameters( + offset, 0, bytesLength, mode, false); + AbfsRestOperation op = client.append(path, bytes, reqParams, + cachedSasToken.get()); + cachedSasToken.update(op.getSasToken()); perfInfo.registerResult(op.getResult()); byteBufferPool.putBuffer(ByteBuffer.wrap(bytes)); @@ -410,7 +458,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa shrinkWriteOperationQueue(); } - private synchronized void flushWrittenBytesToService(boolean isClose) throws IOException { + private synchronized void waitForAppendsToComplete() throws IOException { for (WriteOperation writeOperation : writeOperations) { try { writeOperation.task.get(); @@ -428,6 +476,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa throw lastError; } } + } + + private synchronized void flushWrittenBytesToService(boolean isClose) throws IOException { + waitForAppendsToComplete(); flushWrittenBytesToServiceInternal(position, false, isClose); } @@ -558,6 +610,11 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa return maxRequestsThatCanBeQueued; } + @VisibleForTesting + Boolean isAppendBlobStream() { + return isAppendBlob; + } + /** * Appending AbfsOutputStream statistics to base toString(). * diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java index 2dce5dc2c77..925cd4f7b56 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java @@ -27,6 +27,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext { private boolean enableFlush; + private boolean enableSmallWriteOptimization; + private boolean disableOutputStreamFlush; private AbfsOutputStreamStatistics streamStatistics; @@ -52,6 +54,11 @@ public class AbfsOutputStreamContext extends AbfsStreamContext { return this; } + public AbfsOutputStreamContext enableSmallWriteOptimization(final boolean enableSmallWriteOptimization) { + this.enableSmallWriteOptimization = enableSmallWriteOptimization; + return this; + } + public AbfsOutputStreamContext disableOutputStreamFlush( final boolean disableOutputStreamFlush) { this.disableOutputStreamFlush = disableOutputStreamFlush; @@ -114,4 +121,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext { public int getMaxWriteRequestsToQueue() { return this.maxWriteRequestsToQueue; } + + public boolean isEnableSmallWriteOptimization() { + return this.enableSmallWriteOptimization; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index 83c76f5a6ab..24ec2926647 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -290,7 +290,7 @@ public class AbfsRestOperation { AbfsClientThrottlingIntercept.updateMetrics(operationType, httpOperation); } - LOG.debug("HttpRequest: {}", httpOperation.toString()); + LOG.debug("HttpRequest: {}: {}", operationType, httpOperation.toString()); if (client.getRetryPolicy().shouldRetry(retryCount, httpOperation.getStatusCode())) { return false; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java index c2dbe937b81..66b8da89572 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java @@ -33,14 +33,16 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.BYTES_RECEIVED; import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.GET_RESPONSES; import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.SEND_REQUESTS; public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest { private static final Logger LOG = LoggerFactory.getLogger(ITestAbfsNetworkStatistics.class); - private static final int LARGE_OPERATIONS = 10; + private static final int WRITE_OPERATION_LOOP_COUNT = 10; public ITestAbfsNetworkStatistics() throws Exception { } @@ -58,117 +60,126 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest { Map metricMap; Path sendRequestPath = path(getMethodName()); String testNetworkStatsString = "http_send"; - long connectionsMade, requestsSent, bytesSent; metricMap = fs.getInstrumentationMap(); - long connectionsMadeBeforeTest = metricMap - .get(CONNECTIONS_MADE.getStatName()); - long requestsMadeBeforeTest = metricMap.get(SEND_REQUESTS.getStatName()); + long expectedConnectionsMade = metricMap.get(CONNECTIONS_MADE.getStatName()); + long expectedRequestsSent = metricMap.get(SEND_REQUESTS.getStatName()); + long expectedBytesSent = 0; - /* - * Creating AbfsOutputStream will result in 1 connection made and 1 send - * request. - */ + // -------------------------------------------------------------------- + // Operation: Creating AbfsOutputStream try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs, sendRequestPath)) { + // Network stats calculation: For Creating AbfsOutputStream: + // 1 create request = 1 connection made and 1 send request + expectedConnectionsMade++; + expectedRequestsSent++; + // -------------------------------------------------------------------- + + // Operation: Write small data + // Network stats calculation: No additions. + // Data written is less than the buffer size and hence will not + // trigger any append request to store out.write(testNetworkStatsString.getBytes()); + // -------------------------------------------------------------------- - /* - * Flushes all outstanding data (i.e. the current unfinished packet) - * from the client into the service on all DataNode replicas. - */ + // Operation: HFlush + // Flushes all outstanding data (i.e. the current unfinished packet) + // from the client into the service on all DataNode replicas. out.hflush(); - - metricMap = fs.getInstrumentationMap(); - /* - * Testing the network stats with 1 write operation. + * Network stats calculation: + * 3 possibilities here: + * A. As there is pending data to be written to store, this will result in: + * 1 append + 1 flush = 2 connections and 2 send requests * - * connections_made : (connections made above) + 2(flush). + * B. If config "fs.azure.enable.small.write.optimization" is enabled, append + * and flush call will be merged for small data in buffer in this test. + * In which case it will be: + * 1 append+flush request = 1 connection and 1 send request * - * send_requests : (requests sent above) + 2(flush). - * - * bytes_sent : bytes wrote in AbfsOutputStream. + * C. If the path is configured for append Blob files to be used, hflush + * is a no-op. So in this case: + * 1 append = 1 connection and 1 send request */ - long extraCalls = 0; - if (!fs.getAbfsStore() - .isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())) { - // no network calls are made for hflush in case of appendblob - extraCalls++; + if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString()) + || (this.getConfiguration().isSmallWriteOptimizationEnabled())) { + expectedConnectionsMade++; + expectedRequestsSent++; + } else { + expectedConnectionsMade += 2; + expectedRequestsSent += 2; } - long expectedConnectionsMade = connectionsMadeBeforeTest + extraCalls + 2; - long expectedRequestsSent = requestsMadeBeforeTest + extraCalls + 2; - connectionsMade = assertAbfsStatistics(CONNECTIONS_MADE, + expectedBytesSent += testNetworkStatsString.getBytes().length; + // -------------------------------------------------------------------- + + // Assertions + metricMap = fs.getInstrumentationMap(); + assertAbfsStatistics(CONNECTIONS_MADE, expectedConnectionsMade, metricMap); - requestsSent = assertAbfsStatistics(SEND_REQUESTS, expectedRequestsSent, + assertAbfsStatistics(SEND_REQUESTS, expectedRequestsSent, metricMap); - bytesSent = assertAbfsStatistics(AbfsStatistic.BYTES_SENT, - testNetworkStatsString.getBytes().length, metricMap); + assertAbfsStatistics(AbfsStatistic.BYTES_SENT, + expectedBytesSent, metricMap); } - // To close the AbfsOutputStream 1 connection is made and 1 request is sent. - connectionsMade++; - requestsSent++; - + // -------------------------------------------------------------------- + // Operation: AbfsOutputStream close. + // Network Stats calculation: 1 flush (with close) is send. + // 1 flush request = 1 connection and 1 send request + expectedConnectionsMade++; + expectedRequestsSent++; + // -------------------------------------------------------------------- + // Operation: Re-create the file / create overwrite scenario try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs, sendRequestPath)) { - - // Is a file overwrite case - long createRequestCalls = 1; - long createTriggeredGFSForETag = 0; - if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) { - createRequestCalls += 1; - createTriggeredGFSForETag = 1; - } - - for (int i = 0; i < LARGE_OPERATIONS; i++) { - out.write(testNetworkStatsString.getBytes()); - - /* - * 1 flush call would create 2 connections and 2 send requests. - * when hflush() is called it will essentially trigger append() and - * flush() inside AbfsRestOperation. Both of which calls - * executeHttpOperation() method which creates a connection and sends - * requests. - */ - out.hflush(); - } - - metricMap = fs.getInstrumentationMap(); - /* - * Testing the network stats with Large amount of bytes sent. - * - * connections made : connections_made(Last assertion) + 1 - * (AbfsOutputStream) + LARGE_OPERATIONS * 2(flush). - * - * send requests : requests_sent(Last assertion) + 1(AbfsOutputStream) + - * LARGE_OPERATIONS * 2(flush). - * - * bytes sent : bytes_sent(Last assertion) + LARGE_OPERATIONS * (bytes - * wrote each time). + * Network Stats calculation: create overwrite + * There are 2 possibilities here. + * A. create overwrite results in 1 server call + * create with overwrite=true = 1 connection and 1 send request * + * B. If config "fs.azure.enable.conditional.create.overwrite" is enabled, + * create overwrite=false (will fail in this case as file is indeed present) + * + getFileStatus to fetch the file ETag + * + create overwrite=true + * = 3 connections and 2 send requests */ - - connectionsMade += createRequestCalls + createTriggeredGFSForETag; - requestsSent += createRequestCalls; - if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())) { - // no network calls are made for hflush in case of appendblob - assertAbfsStatistics(CONNECTIONS_MADE, - connectionsMade + LARGE_OPERATIONS, metricMap); - assertAbfsStatistics(SEND_REQUESTS, - requestsSent + LARGE_OPERATIONS, metricMap); + if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) { + expectedConnectionsMade += 3; + expectedRequestsSent += 2; } else { - assertAbfsStatistics(CONNECTIONS_MADE, - connectionsMade + LARGE_OPERATIONS * 2, metricMap); - assertAbfsStatistics(SEND_REQUESTS, - requestsSent + LARGE_OPERATIONS * 2, metricMap); + expectedConnectionsMade += 1; + expectedRequestsSent += 1; } - assertAbfsStatistics(AbfsStatistic.BYTES_SENT, - bytesSent + LARGE_OPERATIONS * (testNetworkStatsString.getBytes().length), - metricMap); + // -------------------------------------------------------------------- + // Operation: Multiple small appends + hflush + for (int i = 0; i < WRITE_OPERATION_LOOP_COUNT; i++) { + out.write(testNetworkStatsString.getBytes()); + // Network stats calculation: no-op. Small write + out.hflush(); + // Network stats calculation: Hflush + // refer to previous comments for hFlush network stats calcualtion + // possibilities + if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString()) + || (this.getConfiguration().isSmallWriteOptimizationEnabled())) { + expectedConnectionsMade++; + expectedRequestsSent++; + } else { + expectedConnectionsMade += 2; + expectedRequestsSent += 2; + } + expectedBytesSent += testNetworkStatsString.getBytes().length; + } + // -------------------------------------------------------------------- + + // Assertions + metricMap = fs.getInstrumentationMap(); + assertAbfsStatistics(CONNECTIONS_MADE, expectedConnectionsMade, metricMap); + assertAbfsStatistics(SEND_REQUESTS, expectedRequestsSent, metricMap); + assertAbfsStatistics(AbfsStatistic.BYTES_SENT, expectedBytesSent, metricMap); } } @@ -185,130 +196,100 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest { Path getResponsePath = path(getMethodName()); Map metricMap; String testResponseString = "some response"; - long getResponses, bytesReceived; FSDataOutputStream out = null; FSDataInputStream in = null; - try { + long expectedConnectionsMade; + long expectedGetResponses; + long expectedBytesReceived; - /* - * Creating a File and writing some bytes in it. - * - * get_response : 3(getFileSystem) + 1(OutputStream creation) + 2 - * (Writing data in Data store). - * - */ + try { + // Creating a File and writing some bytes in it. out = fs.create(getResponsePath); out.write(testResponseString.getBytes()); out.hflush(); + // Set metric baseline metricMap = fs.getInstrumentationMap(); - long getResponsesBeforeTest = metricMap - .get(CONNECTIONS_MADE.getStatName()); + long bytesWrittenToFile = testResponseString.getBytes().length; + expectedConnectionsMade = metricMap.get(CONNECTIONS_MADE.getStatName()); + expectedGetResponses = metricMap.get(CONNECTIONS_MADE.getStatName()); + expectedBytesReceived = metricMap.get(BYTES_RECEIVED.getStatName()); - // open would require 1 get response. + // -------------------------------------------------------------------- + // Operation: Create AbfsInputStream in = fs.open(getResponsePath); - // read would require 1 get response and also get the bytes received. + // Network stats calculation: For Creating AbfsInputStream: + // 1 GetFileStatus request to fetch file size = 1 connection and 1 get response + expectedConnectionsMade++; + expectedGetResponses++; + // -------------------------------------------------------------------- + + // Operation: Read int result = in.read(); + // Network stats calculation: For read: + // 1 read request = 1 connection and 1 get response + expectedConnectionsMade++; + expectedGetResponses++; + expectedBytesReceived += bytesWrittenToFile; + // -------------------------------------------------------------------- - // Confirming read isn't -1. - LOG.info("Result of read operation : {}", result); - + // Assertions metricMap = fs.getInstrumentationMap(); - - /* - * Testing values of statistics after writing and reading a buffer. - * - * get_responses - (above operations) + 1(open()) + 1 (read()).; - * - * bytes_received - This should be equal to bytes sent earlier. - */ - long extraCalls = 0; - if (!fs.getAbfsStore() - .isAppendBlobKey(fs.makeQualified(getResponsePath).toString())) { - // no network calls are made for hflush in case of appendblob - extraCalls++; - } - long expectedGetResponses = getResponsesBeforeTest + extraCalls + 1; - getResponses = assertAbfsStatistics(AbfsStatistic.GET_RESPONSES, - expectedGetResponses, metricMap); - - // Testing that bytes received is equal to bytes sent. - long bytesSend = metricMap.get(AbfsStatistic.BYTES_SENT.getStatName()); - bytesReceived = assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED, - bytesSend, - metricMap); - + assertAbfsStatistics(CONNECTIONS_MADE, expectedConnectionsMade, metricMap); + assertAbfsStatistics(GET_RESPONSES, expectedGetResponses, metricMap); + assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED, expectedBytesReceived, metricMap); } finally { IOUtils.cleanupWithLogger(LOG, out, in); } - // To close the streams 1 response is received. - getResponses++; + // -------------------------------------------------------------------- + // Operation: AbfsOutputStream close. + // Network Stats calculation: no op. + // -------------------------------------------------------------------- try { - /* - * Creating a file and writing buffer into it. - * This is a file recreate, so it will trigger - * 2 extra calls if create overwrite is off by default. - * Also recording the buffer for future read() call. - * This creating outputStream and writing requires 2 * - * (LARGE_OPERATIONS) get requests. - */ + // Recreate file with different file size + // [Create and append related network stats checks are done in + // test method testAbfsHttpSendStatistics] StringBuilder largeBuffer = new StringBuilder(); out = fs.create(getResponsePath); - long createRequestCalls = 1; - if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) { - createRequestCalls += 2; - } - - for (int i = 0; i < LARGE_OPERATIONS; i++) { + for (int i = 0; i < WRITE_OPERATION_LOOP_COUNT; i++) { out.write(testResponseString.getBytes()); out.hflush(); largeBuffer.append(testResponseString); } - // Open requires 1 get_response. - in = fs.open(getResponsePath); - - /* - * Reading the file which was written above. This read() call would - * read bytes equal to the bytes that was written above. - * Get response would be 1 only. - */ - in.read(0, largeBuffer.toString().getBytes(), 0, - largeBuffer.toString().getBytes().length); - + // sync back to metric baseline metricMap = fs.getInstrumentationMap(); + expectedConnectionsMade = metricMap.get(CONNECTIONS_MADE.getStatName()); + expectedGetResponses = metricMap.get(GET_RESPONSES.getStatName()); + // -------------------------------------------------------------------- + // Operation: Create AbfsInputStream + in = fs.open(getResponsePath); + // Network stats calculation: For Creating AbfsInputStream: + // 1 GetFileStatus for file size = 1 connection and 1 get response + expectedConnectionsMade++; + expectedGetResponses++; + // -------------------------------------------------------------------- - /* - * Testing the statistics values after writing and reading a large buffer. - * - * get_response : get_responses(Last assertion) + 1 - * (OutputStream) + 2 * LARGE_OPERATIONS(Writing and flushing - * LARGE_OPERATIONS times) + 1(open()) + 1(read()) + - * 1 (createOverwriteTriggeredGetForeTag). - * - * bytes_received : bytes_received(Last assertion) + LARGE_OPERATIONS * - * bytes wrote each time (bytes_received is equal to bytes wrote in the - * File). - * - */ - assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED, - bytesReceived + LARGE_OPERATIONS * (testResponseString.getBytes().length), - metricMap); - if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(getResponsePath).toString())) { - // no network calls are made for hflush in case of appendblob - assertAbfsStatistics(AbfsStatistic.GET_RESPONSES, - getResponses + 3 + LARGE_OPERATIONS, metricMap); - } else { - assertAbfsStatistics(AbfsStatistic.GET_RESPONSES, - getResponses + 2 + createRequestCalls + 2 * LARGE_OPERATIONS, - metricMap); - } + // Operation: Read + in.read(0, largeBuffer.toString().getBytes(), 0, largeBuffer.toString().getBytes().length); + // Network stats calculation: Total data written is still lesser than + // a buffer size. Hence will trigger only one read to store. So result is: + // 1 read request = 1 connection and 1 get response + expectedConnectionsMade++; + expectedGetResponses++; + expectedBytesReceived += (WRITE_OPERATION_LOOP_COUNT * testResponseString.getBytes().length); + // -------------------------------------------------------------------- + // Assertions + metricMap = fs.getInstrumentationMap(); + assertAbfsStatistics(CONNECTIONS_MADE, expectedConnectionsMade, metricMap); + assertAbfsStatistics(GET_RESPONSES, expectedGetResponses, metricMap); + assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED, expectedBytesReceived, metricMap); } finally { IOUtils.cleanupWithLogger(LOG, out, in); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestSmallWriteOptimization.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestSmallWriteOptimization.java new file mode 100644 index 00000000000..fce2b682f58 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestSmallWriteOptimization.java @@ -0,0 +1,523 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.util.Arrays; +import java.util.Random; +import java.util.UUID; +import java.util.Map; +import java.io.IOException; + +import org.assertj.core.api.Assertions; +import org.junit.Assume; +import org.junit.runners.Parameterized; +import org.junit.runner.RunWith; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; + +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.BYTES_SENT; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.SEND_REQUESTS; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION; +import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_APPENDBLOB_ENABLED; + +/** + * Test combination for small writes with flush and close operations. + * This test class formulates an append test flow to assert on various scenarios. + * Test stages: + * 1. Pre-create test file of required size. This is determined by + * startingFileSize parameter. If it is 0, then pre-creation is skipped. + * + * 2. Formulate an append loop or iteration. An iteration, will do N writes + * (determined by numOfClientWrites parameter) with each writing X bytes + * (determined by recurringClientWriteSize parameter). + * + * 3. Determine total number of append iterations needed by a test. + * If intention is to close the outputStream right after append, setting + * directCloseTest parameter will determine 1 append test iteration with an + * ending close. + * Else, it will execute TEST_FLUSH_ITERATION number of test iterations, with + * each doing appends, hflush/hsync and then close. + * + * 4. Execute test iterations with asserts on number of store requests made and + * validating file content. + */ +@RunWith(Parameterized.class) +public class ITestSmallWriteOptimization extends AbstractAbfsScaleTest { + private static final int ONE_MB = 1024 * 1024; + private static final int TWO_MB = 2 * ONE_MB; + private static final int TEST_BUFFER_SIZE = TWO_MB; + private static final int HALF_TEST_BUFFER_SIZE = TWO_MB / 2; + private static final int QUARTER_TEST_BUFFER_SIZE = TWO_MB / 4; + private static final int TEST_FLUSH_ITERATION = 2; + + @Parameterized.Parameter + public String testScenario; + + @Parameterized.Parameter(1) + public boolean enableSmallWriteOptimization; + + /** + * If true, will initiate close after appends. (That is, no explicit hflush or + * hsync calls will be made from client app.) + */ + @Parameterized.Parameter(2) + public boolean directCloseTest; + + /** + * If non-zero, test file should be created as pre-requisite with this size. + */ + @Parameterized.Parameter(3) + public Integer startingFileSize; + + /** + * Determines the write sizes to be issued by client app. + */ + @Parameterized.Parameter(4) + public Integer recurringClientWriteSize; + + /** + * Determines the number of Client writes to make. + */ + @Parameterized.Parameter(5) + public Integer numOfClientWrites; + + /** + * True, if the small write optimization is supposed to be effective in + * the scenario. + */ + @Parameterized.Parameter(6) + public boolean flushExpectedToBeMergedWithAppend; + + @Parameterized.Parameters(name = "{0}") + public static Iterable params() { + return Arrays.asList( + // Parameter Order : + // testScenario, + // enableSmallWriteOptimization, directCloseTest, startingFileSize, + // recurringClientWriteSize, numOfClientWrites, flushExpectedToBeMergedWithAppend + new Object[][]{ + // Buffer Size Write tests + { "OptmON_FlushCloseTest_EmptyFile_BufferSizeWrite", + true, false, 0, TEST_BUFFER_SIZE, 1, false + }, + { "OptmON_FlushCloseTest_NonEmptyFile_BufferSizeWrite", + true, false, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 1, false + }, + { "OptmON_CloseTest_EmptyFile_BufferSizeWrite", + true, true, 0, TEST_BUFFER_SIZE, 1, false + }, + { "OptmON_CloseTest_NonEmptyFile_BufferSizeWrite", + true, true, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 1, false + }, + { "OptmOFF_FlushCloseTest_EmptyFile_BufferSizeWrite", + false, false, 0, TEST_BUFFER_SIZE, 1, false + }, + { "OptmOFF_FlushCloseTest_NonEmptyFile_BufferSizeWrite", + false, false, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 1, false + }, + { "OptmOFF_CloseTest_EmptyFile_BufferSizeWrite", + false, true, 0, TEST_BUFFER_SIZE, 1, false + }, + { "OptmOFF_CloseTest_NonEmptyFile_BufferSizeWrite", + false, true, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 1, false + }, + // Less than buffer size write tests + { "OptmON_FlushCloseTest_EmptyFile_LessThanBufferSizeWrite", + true, false, 0, Math.abs(HALF_TEST_BUFFER_SIZE), 1, true + }, + { "OptmON_FlushCloseTest_NonEmptyFile_LessThanBufferSizeWrite", + true, false, 2 * TEST_BUFFER_SIZE, + Math.abs(HALF_TEST_BUFFER_SIZE), 1, true + }, + { "OptmON_CloseTest_EmptyFile_LessThanBufferSizeWrite", + true, true, 0, Math.abs(HALF_TEST_BUFFER_SIZE), 1, true + }, + { "OptmON_CloseTest_NonEmptyFile_LessThanBufferSizeWrite", + true, true, 2 * TEST_BUFFER_SIZE, + Math.abs(HALF_TEST_BUFFER_SIZE), 1, true + }, + { "OptmOFF_FlushCloseTest_EmptyFile_LessThanBufferSizeWrite", + false, false, 0, Math.abs(HALF_TEST_BUFFER_SIZE), 1, false + }, + { "OptmOFF_FlushCloseTest_NonEmptyFile_LessThanBufferSizeWrite", + false, false, 2 * TEST_BUFFER_SIZE, + Math.abs(HALF_TEST_BUFFER_SIZE), 1, false + }, + { "OptmOFF_CloseTest_EmptyFile_LessThanBufferSizeWrite", + false, true, 0, Math.abs(HALF_TEST_BUFFER_SIZE), 1, false + }, + { "OptmOFF_CloseTest_NonEmptyFile_LessThanBufferSizeWrite", + false, true, 2 * TEST_BUFFER_SIZE, + Math.abs(HALF_TEST_BUFFER_SIZE), 1, false + }, + // Multiple small writes still less than buffer size + { "OptmON_FlushCloseTest_EmptyFile_MultiSmallWritesStillLessThanBufferSize", + true, false, 0, Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, true + }, + { "OptmON_FlushCloseTest_NonEmptyFile_MultiSmallWritesStillLessThanBufferSize", + true, false, 2 * TEST_BUFFER_SIZE, + Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, true + }, + { "OptmON_CloseTest_EmptyFile_MultiSmallWritesStillLessThanBufferSize", + true, true, 0, Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, true + }, + { "OptmON_CloseTest_NonEmptyFile_MultiSmallWritesStillLessThanBufferSize", + true, true, 2 * TEST_BUFFER_SIZE, + Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, true + }, + { "OptmOFF_FlushCloseTest_EmptyFile_MultiSmallWritesStillLessThanBufferSize", + false, false, 0, Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, false + }, + { "OptmOFF_FlushCloseTest_NonEmptyFile_MultiSmallWritesStillLessThanBufferSize", + false, false, 2 * TEST_BUFFER_SIZE, + Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, false + }, + { "OptmOFF_CloseTest_EmptyFile_MultiSmallWritesStillLessThanBufferSize", + false, true, 0, Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, false + }, + { "OptmOFF_CloseTest_NonEmptyFile_MultiSmallWritesStillLessThanBufferSize", + false, true, 2 * TEST_BUFFER_SIZE, + Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, false + }, + // Multiple full buffer writes + { "OptmON_FlushCloseTest_EmptyFile_MultiBufferSizeWrite", + true, false, 0, TEST_BUFFER_SIZE, 3, false + }, + { "OptmON_FlushCloseTest_NonEmptyFile_MultiBufferSizeWrite", + true, false, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 3, false + }, + { "OptmON_CloseTest_EmptyFile_MultiBufferSizeWrite", + true, true, 0, TEST_BUFFER_SIZE, 3, false + }, + { "OptmON_CloseTest_NonEmptyFile_MultiBufferSizeWrite", + true, true, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 3, false + }, + { "OptmOFF_FlushCloseTest_EmptyFile_MultiBufferSizeWrite", + false, false, 0, TEST_BUFFER_SIZE, 3, false + }, + { "OptmOFF_FlushCloseTest_NonEmptyFile_MultiBufferSizeWrite", + false, false, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 3, false + }, + { "OptmOFF_CloseTest_EmptyFile_MultiBufferSizeWrite", + false, true, 0, TEST_BUFFER_SIZE, 3, false + }, + { "OptmOFF_CloseTest_NonEmptyFile_MultiBufferSizeWrite", + false, true, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 3, false + }, + // Multiple full buffers triggered and data less than buffer size pending + { "OptmON_FlushCloseTest_EmptyFile_BufferAndExtraWrite", + true, false, 0, + TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE), + 3, false + }, + { "OptmON_FlushCloseTest_NonEmptyFile_BufferAndExtraWrite", + true, false, 2 * TEST_BUFFER_SIZE, + TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE), + 3, false + }, + { "OptmON_CloseTest_EmptyFile__BufferAndExtraWrite", + true, true, 0, + TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE), + 3, false + }, + { "OptmON_CloseTest_NonEmptyFile_BufferAndExtraWrite", + true, true, 2 * TEST_BUFFER_SIZE, + TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE), + 3, false + }, + { "OptmOFF_FlushCloseTest_EmptyFile_BufferAndExtraWrite", + false, false, 0, + TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE), + 3, false + }, + { "OptmOFF_FlushCloseTest_NonEmptyFile_BufferAndExtraWrite", + false, false, 2 * TEST_BUFFER_SIZE, + TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE), + 3, false + }, + { "OptmOFF_CloseTest_EmptyFile_BufferAndExtraWrite", + false, true, 0, + TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE), + 3, false + }, + { "OptmOFF_CloseTest_NonEmptyFile_BufferAndExtraWrite", + false, true, 2 * TEST_BUFFER_SIZE, + TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE), + 3, false + }, + // 0 byte tests + { "OptmON_FlushCloseTest_EmptyFile_0ByteWrite", + true, false, 0, 0, 1, false + }, + { "OptmON_FlushCloseTest_NonEmptyFile_0ByteWrite", + true, false, 2 * TEST_BUFFER_SIZE, 0, 1, false + }, + { "OptmON_CloseTest_EmptyFile_0ByteWrite", + true, true, 0, 0, 1, false + }, + { "OptmON_CloseTest_NonEmptyFile_0ByteWrite", + true, true, 2 * TEST_BUFFER_SIZE, 0, 1, false + }, + { "OptmOFF_FlushCloseTest_EmptyFile_0ByteWrite", + false, false, 0, 0, 1, false + }, + { "OptmOFF_FlushCloseTest_NonEmptyFile_0ByteWrite", + false, false, 2 * TEST_BUFFER_SIZE, 0, 1, false + }, + { "OptmOFF_CloseTest_EmptyFile_0ByteWrite", + false, true, 0, 0, 1, false + }, + { "OptmOFF_CloseTest_NonEmptyFile_0ByteWrite", + false, true, 2 * TEST_BUFFER_SIZE, 0, 1, false + }, + }); + } + public ITestSmallWriteOptimization() throws Exception { + super(); + } + + @Test + public void testSmallWriteOptimization() + throws IOException { + boolean serviceDefaultOptmSettings = DEFAULT_AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION; + // Tests with Optimization should only run if service has the feature on by + // default. Default settings will be turned on when server support is + // available on all store prod regions. + if (enableSmallWriteOptimization) { + Assume.assumeTrue(serviceDefaultOptmSettings); + } + + final AzureBlobFileSystem currentfs = this.getFileSystem(); + Configuration config = currentfs.getConf(); + boolean isAppendBlobTestSettingEnabled = (config.get(FS_AZURE_TEST_APPENDBLOB_ENABLED) == "true"); + + // This optimization doesnt take effect when append blob is on. + Assume.assumeFalse(isAppendBlobTestSettingEnabled); + + config.set(ConfigurationKeys.AZURE_WRITE_BUFFER_SIZE, Integer.toString(TEST_BUFFER_SIZE)); + config.set(ConfigurationKeys.AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION, Boolean.toString(enableSmallWriteOptimization)); + final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.get( + currentfs.getUri(), config); + + formulateSmallWriteTestAppendPattern(fs, startingFileSize, + recurringClientWriteSize, numOfClientWrites, + directCloseTest, flushExpectedToBeMergedWithAppend); + } + + /** + * if isDirectCloseTest == true, append + close is triggered + * if isDirectCloseTest == false, append + flush runs are repeated over + * iterations followed by close + * @param fs + * @param startingFileSize + * @param recurringWriteSize + * @param numOfWrites + * @param isDirectCloseTest + * @throws IOException + */ + private void formulateSmallWriteTestAppendPattern(final AzureBlobFileSystem fs, + int startingFileSize, + int recurringWriteSize, + int numOfWrites, + boolean isDirectCloseTest, + boolean flushExpectedToBeMergedWithAppend) throws IOException { + + int totalDataToBeAppended = 0; + int testIteration = 0; + int dataWrittenPerIteration = (numOfWrites * recurringWriteSize); + + if (isDirectCloseTest) { + totalDataToBeAppended = dataWrittenPerIteration; + testIteration = 1; + } else { + testIteration = TEST_FLUSH_ITERATION; + totalDataToBeAppended = testIteration * dataWrittenPerIteration; + } + + int totalFileSize = totalDataToBeAppended + startingFileSize; + // write buffer of file size created. This will be used as write + // source and for file content validation + final byte[] writeBuffer = new byte[totalFileSize]; + new Random().nextBytes(writeBuffer); + int writeBufferCursor = 0; + + Path testPath = new Path(getMethodName() + UUID.randomUUID().toString()); + FSDataOutputStream opStream; + + if (startingFileSize > 0) { + writeBufferCursor += createFileWithStartingTestSize(fs, writeBuffer, writeBufferCursor, testPath, + startingFileSize); + opStream = fs.append(testPath); + } else { + opStream = fs.create(testPath); + } + + final int writeBufferSize = fs.getAbfsStore() + .getAbfsConfiguration() + .getWriteBufferSize(); + long expectedTotalRequestsMade = fs.getInstrumentationMap() + .get(CONNECTIONS_MADE.getStatName()); + long expectedRequestsMadeWithData = fs.getInstrumentationMap() + .get(SEND_REQUESTS.getStatName()); + long expectedBytesSent = fs.getInstrumentationMap() + .get(BYTES_SENT.getStatName()); + + while (testIteration > 0) { + // trigger recurringWriteSize appends over numOfWrites + writeBufferCursor += executeWritePattern(opStream, writeBuffer, + writeBufferCursor, numOfWrites, recurringWriteSize); + + int numOfBuffersWrittenToStore = (int) Math.floor( + dataWrittenPerIteration / writeBufferSize); + int dataSizeWrittenToStore = numOfBuffersWrittenToStore * writeBufferSize; + int pendingDataToStore = dataWrittenPerIteration - dataSizeWrittenToStore; + + expectedTotalRequestsMade += numOfBuffersWrittenToStore; + expectedRequestsMadeWithData += numOfBuffersWrittenToStore; + expectedBytesSent += dataSizeWrittenToStore; + + if (isDirectCloseTest) { + opStream.close(); + } else { + opStream.hflush(); + } + + boolean wasDataPendingToBeWrittenToServer = (pendingDataToStore > 0); + // Small write optimization will only work if + // a. config for small write optimization is on + // b. no buffer writes have been triggered since last flush + // c. there is some pending data in buffer to write to store + final boolean smallWriteOptimizationEnabled = fs.getAbfsStore() + .getAbfsConfiguration() + .isSmallWriteOptimizationEnabled(); + boolean flushWillBeMergedWithAppend = smallWriteOptimizationEnabled + && (numOfBuffersWrittenToStore == 0) + && (wasDataPendingToBeWrittenToServer); + + Assertions.assertThat(flushWillBeMergedWithAppend) + .describedAs(flushExpectedToBeMergedWithAppend + ? "Flush was to be merged with Append" + : "Flush should not have been merged with Append") + .isEqualTo(flushExpectedToBeMergedWithAppend); + + int totalAppendFlushCalls = (flushWillBeMergedWithAppend + ? 1 // 1 append (with flush and close param) + : (wasDataPendingToBeWrittenToServer) + ? 2 // 1 append + 1 flush (with close) + : 1); // 1 flush (with close) + + expectedTotalRequestsMade += totalAppendFlushCalls; + expectedRequestsMadeWithData += totalAppendFlushCalls; + expectedBytesSent += wasDataPendingToBeWrittenToServer + ? pendingDataToStore + : 0; + + assertOpStats(fs.getInstrumentationMap(), expectedTotalRequestsMade, + expectedRequestsMadeWithData, expectedBytesSent); + + if (isDirectCloseTest) { + // stream already closed + validateStoreAppends(fs, testPath, totalFileSize, writeBuffer); + return; + } + + testIteration--; + } + + opStream.close(); + expectedTotalRequestsMade += 1; + expectedRequestsMadeWithData += 1; + // no change in expectedBytesSent + assertOpStats(fs.getInstrumentationMap(), expectedTotalRequestsMade, expectedRequestsMadeWithData, expectedBytesSent); + + validateStoreAppends(fs, testPath, totalFileSize, writeBuffer); + } + + private int createFileWithStartingTestSize(AzureBlobFileSystem fs, byte[] writeBuffer, + int writeBufferCursor, Path testPath, int startingFileSize) + throws IOException { + FSDataOutputStream opStream = fs.create(testPath); + writeBufferCursor += executeWritePattern(opStream, + writeBuffer, + writeBufferCursor, + 1, + startingFileSize); + + opStream.close(); + Assertions.assertThat(fs.getFileStatus(testPath).getLen()) + .describedAs("File should be of size %d at the start of test.", + startingFileSize) + .isEqualTo(startingFileSize); + + return writeBufferCursor; + } + + private void validateStoreAppends(AzureBlobFileSystem fs, + Path testPath, + int totalFileSize, + byte[] bufferWritten) + throws IOException { + // Final validation + Assertions.assertThat(fs.getFileStatus(testPath).getLen()) + .describedAs("File should be of size %d at the end of test.", + totalFileSize) + .isEqualTo(totalFileSize); + + byte[] fileReadFromStore = new byte[totalFileSize]; + fs.open(testPath).read(fileReadFromStore, 0, totalFileSize); + + assertArrayEquals("Test file content incorrect", bufferWritten, + fileReadFromStore); + } + + private void assertOpStats(Map metricMap, + long expectedTotalRequestsMade, + long expectedRequestsMadeWithData, + long expectedBytesSent) { + assertAbfsStatistics(CONNECTIONS_MADE, expectedTotalRequestsMade, + metricMap); + assertAbfsStatistics(SEND_REQUESTS, expectedRequestsMadeWithData, + metricMap); + assertAbfsStatistics(BYTES_SENT, expectedBytesSent, metricMap); + } + + private int executeWritePattern(FSDataOutputStream opStream, + byte[] buffer, + int startOffset, + int writeLoopCount, + int writeSize) + throws IOException { + int dataSizeWritten = startOffset; + + while (writeLoopCount > 0) { + opStream.write(buffer, startOffset, writeSize); + startOffset += writeSize; + writeLoopCount--; + } + + dataSizeWritten = startOffset - dataSizeWritten; + return dataSizeWritten; + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java index 7f9111683d5..fff005114fb 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java @@ -44,10 +44,16 @@ public class ITestAbfsOutputStream extends AbstractAbfsIntegrationTest { final AzureBlobFileSystem fs = getFileSystem(conf); try (FSDataOutputStream out = fs.create(TEST_FILE_PATH)) { AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream(); + + int maxConcurrentRequests + = getConfiguration().getWriteMaxConcurrentRequestCount(); + if (stream.isAppendBlobStream()) { + maxConcurrentRequests = 1; + } + Assertions.assertThat(stream.getMaxConcurrentRequestCount()).describedAs( - "maxConcurrentRequests should be " + getConfiguration() - .getWriteMaxConcurrentRequestCount()) - .isEqualTo(getConfiguration().getWriteMaxConcurrentRequestCount()); + "maxConcurrentRequests should be " + maxConcurrentRequests) + .isEqualTo(maxConcurrentRequests); Assertions.assertThat(stream.getMaxRequestsThatCanBeQueued()).describedAs( "maxRequestsToQueue should be " + getConfiguration() .getMaxWriteRequestsToQueue()) @@ -67,6 +73,11 @@ public class ITestAbfsOutputStream extends AbstractAbfsIntegrationTest { final AzureBlobFileSystem fs = getFileSystem(conf); FSDataOutputStream out = fs.create(TEST_FILE_PATH); AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream(); + + if (stream.isAppendBlobStream()) { + maxConcurrentRequests = 1; + } + Assertions.assertThat(stream.getMaxConcurrentRequestCount()) .describedAs("maxConcurrentRequests should be " + maxConcurrentRequests) .isEqualTo(maxConcurrentRequests); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java index aab0248c407..1e6b8efe6d9 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java @@ -20,7 +20,6 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; import java.util.Arrays; -import java.util.HashSet; import java.util.Random; import org.junit.Test; @@ -28,19 +27,22 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; +import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters; import org.apache.hadoop.conf.Configuration; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.refEq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.anyBoolean; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.anyLong; import static org.assertj.core.api.Assertions.assertThat; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE; public final class TestAbfsOutputStream { @@ -83,22 +85,15 @@ public final class TestAbfsOutputStream { abfsConf = new AbfsConfiguration(conf, accountName1); AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op); + when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op); - AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false)); + AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, + populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false)); final byte[] b = new byte[WRITE_SIZE]; new Random().nextBytes(b); out.write(b); out.hsync(); - ArgumentCaptor acString = ArgumentCaptor.forClass(String.class); - ArgumentCaptor acLong = ArgumentCaptor.forClass(Long.class); - ArgumentCaptor acBufferOffset = ArgumentCaptor.forClass(Integer.class); - ArgumentCaptor acBufferLength = ArgumentCaptor.forClass(Integer.class); - ArgumentCaptor acByteArray = ArgumentCaptor.forClass(byte[].class); - ArgumentCaptor acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class); - ArgumentCaptor acSASToken = ArgumentCaptor.forClass(String.class); - final byte[] b1 = new byte[2*WRITE_SIZE]; new Random().nextBytes(b1); @@ -108,13 +103,18 @@ public final class TestAbfsOutputStream { out.hsync(); - verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(), - acSASToken.capture(), acAppendBlobAppend.capture()); - assertThat(Arrays.asList(PATH, PATH)).describedAs("Path of the requests").isEqualTo(acString.getAllValues()); - assertThat(Arrays.asList(Long.valueOf(0), Long.valueOf(WRITE_SIZE))).describedAs("Write Position").isEqualTo(acLong.getAllValues()); - assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues()); - assertThat(Arrays.asList(WRITE_SIZE, 2*WRITE_SIZE)).describedAs("Buffer length").isEqualTo(acBufferLength.getAllValues()); + AppendRequestParameters firstReqParameters = new AppendRequestParameters( + 0, 0, WRITE_SIZE, APPEND_MODE, false); + AppendRequestParameters secondReqParameters = new AppendRequestParameters( + WRITE_SIZE, 0, 2 * WRITE_SIZE, APPEND_MODE, false); + verify(client, times(1)).append( + eq(PATH), any(byte[].class), refEq(firstReqParameters), any()); + verify(client, times(1)).append( + eq(PATH), any(byte[].class), refEq(secondReqParameters), any()); + // confirm there were only 2 invocations in all + verify(client, times(2)).append( + eq(PATH), any(byte[].class), any(), any()); } /** @@ -132,10 +132,11 @@ public final class TestAbfsOutputStream { AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op); + when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op); - AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false)); + AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, + populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false)); final byte[] b = new byte[WRITE_SIZE]; new Random().nextBytes(b); @@ -144,33 +145,29 @@ public final class TestAbfsOutputStream { } out.close(); - ArgumentCaptor acString = ArgumentCaptor.forClass(String.class); - ArgumentCaptor acLong = ArgumentCaptor.forClass(Long.class); - ArgumentCaptor acBufferOffset = ArgumentCaptor.forClass(Integer.class); - ArgumentCaptor acBufferLength = ArgumentCaptor.forClass(Integer.class); - ArgumentCaptor acByteArray = ArgumentCaptor.forClass(byte[].class); - ArgumentCaptor acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class); - ArgumentCaptor acSASToken = ArgumentCaptor.forClass(String.class); + AppendRequestParameters firstReqParameters = new AppendRequestParameters( + 0, 0, BUFFER_SIZE, APPEND_MODE, false); + AppendRequestParameters secondReqParameters = new AppendRequestParameters( + BUFFER_SIZE, 0, 5*WRITE_SIZE-BUFFER_SIZE, APPEND_MODE, false); - verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(), - acSASToken.capture(), acAppendBlobAppend.capture()); - assertThat(Arrays.asList(PATH, PATH)).describedAs("Path").isEqualTo(acString.getAllValues()); - assertThat(new HashSet(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("Position").isEqualTo(new HashSet( - acLong.getAllValues())); - assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues()); - assertThat(new HashSet(Arrays.asList(BUFFER_SIZE, 5*WRITE_SIZE-BUFFER_SIZE))).describedAs("Buffer Length").isEqualTo(new HashSet( - acBufferLength.getAllValues())); + verify(client, times(1)).append( + eq(PATH), any(byte[].class), refEq(firstReqParameters), any()); + verify(client, times(1)).append( + eq(PATH), any(byte[].class), refEq(secondReqParameters), any()); + // confirm there were only 2 invocations in all + verify(client, times(2)).append( + eq(PATH), any(byte[].class), any(), any()); - ArgumentCaptor acFlushString = ArgumentCaptor.forClass(String.class); - ArgumentCaptor acFlushLong = ArgumentCaptor.forClass(Long.class); + ArgumentCaptor acFlushPath = ArgumentCaptor.forClass(String.class); + ArgumentCaptor acFlushPosition = ArgumentCaptor.forClass(Long.class); ArgumentCaptor acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class); ArgumentCaptor acFlushClose = ArgumentCaptor.forClass(Boolean.class); ArgumentCaptor acFlushSASToken = ArgumentCaptor.forClass(String.class); - verify(client, times(1)).flush(acFlushString.capture(), acFlushLong.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(), - acFlushSASToken.capture()); - assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushString.getAllValues()); - assertThat(Arrays.asList(Long.valueOf(5*WRITE_SIZE))).describedAs("position").isEqualTo(acFlushLong.getAllValues()); + verify(client, times(1)).flush(acFlushPath.capture(), acFlushPosition.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(), + acFlushSASToken.capture()); + assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushPath.getAllValues()); + assertThat(Arrays.asList(Long.valueOf(5*WRITE_SIZE))).describedAs("position").isEqualTo(acFlushPosition.getAllValues()); assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues()); assertThat(Arrays.asList(true)).describedAs("Close flag").isEqualTo(acFlushClose.getAllValues()); } @@ -191,12 +188,13 @@ public final class TestAbfsOutputStream { AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op); + when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op); when(op.getSasToken()).thenReturn("testToken"); when(op.getResult()).thenReturn(httpOp); - AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false)); + AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, + populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false)); final byte[] b = new byte[BUFFER_SIZE]; new Random().nextBytes(b); @@ -205,35 +203,31 @@ public final class TestAbfsOutputStream { } out.close(); - ArgumentCaptor acString = ArgumentCaptor.forClass(String.class); - ArgumentCaptor acLong = ArgumentCaptor.forClass(Long.class); - ArgumentCaptor acBufferOffset = ArgumentCaptor.forClass(Integer.class); - ArgumentCaptor acBufferLength = ArgumentCaptor.forClass(Integer.class); - ArgumentCaptor acByteArray = ArgumentCaptor.forClass(byte[].class); - ArgumentCaptor acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class); - ArgumentCaptor acSASToken = ArgumentCaptor.forClass(String.class); + AppendRequestParameters firstReqParameters = new AppendRequestParameters( + 0, 0, BUFFER_SIZE, APPEND_MODE, false); + AppendRequestParameters secondReqParameters = new AppendRequestParameters( + BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false); - verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(), - acSASToken.capture(), acAppendBlobAppend.capture()); - assertThat(Arrays.asList(PATH, PATH)).describedAs("path").isEqualTo(acString.getAllValues()); - assertThat(new HashSet(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("Position").isEqualTo(new HashSet( - acLong.getAllValues())); - assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues()); - assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues()); + verify(client, times(1)).append( + eq(PATH), any(byte[].class), refEq(firstReqParameters), any()); + verify(client, times(1)).append( + eq(PATH), any(byte[].class), refEq(secondReqParameters), any()); + // confirm there were only 2 invocations in all + verify(client, times(2)).append( + eq(PATH), any(byte[].class), any(), any()); - ArgumentCaptor acFlushString = ArgumentCaptor.forClass(String.class); - ArgumentCaptor acFlushLong = ArgumentCaptor.forClass(Long.class); + ArgumentCaptor acFlushPath = ArgumentCaptor.forClass(String.class); + ArgumentCaptor acFlushPosition = ArgumentCaptor.forClass(Long.class); ArgumentCaptor acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class); ArgumentCaptor acFlushClose = ArgumentCaptor.forClass(Boolean.class); ArgumentCaptor acFlushSASToken = ArgumentCaptor.forClass(String.class); - verify(client, times(1)).flush(acFlushString.capture(), acFlushLong.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(), - acFlushSASToken.capture()); - assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushString.getAllValues()); - assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushLong.getAllValues()); + verify(client, times(1)).flush(acFlushPath.capture(), acFlushPosition.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(), + acFlushSASToken.capture()); + assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushPath.getAllValues()); + assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushPosition.getAllValues()); assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues()); assertThat(Arrays.asList(true)).describedAs("Close flag").isEqualTo(acFlushClose.getAllValues()); - } /** @@ -252,12 +246,13 @@ public final class TestAbfsOutputStream { AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op); + when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op); when(op.getSasToken()).thenReturn("testToken"); when(op.getResult()).thenReturn(httpOp); - AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false)); + AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, + populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false)); final byte[] b = new byte[BUFFER_SIZE]; new Random().nextBytes(b); @@ -266,22 +261,18 @@ public final class TestAbfsOutputStream { } Thread.sleep(1000); - ArgumentCaptor acString = ArgumentCaptor.forClass(String.class); - ArgumentCaptor acLong = ArgumentCaptor.forClass(Long.class); - ArgumentCaptor acBufferOffset = ArgumentCaptor.forClass(Integer.class); - ArgumentCaptor acBufferLength = ArgumentCaptor.forClass(Integer.class); - ArgumentCaptor acByteArray = ArgumentCaptor.forClass(byte[].class); - ArgumentCaptor acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class); - ArgumentCaptor acSASToken = ArgumentCaptor.forClass(String.class); - - verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(), - acSASToken.capture(), acAppendBlobAppend.capture()); - assertThat(Arrays.asList(PATH, PATH)).describedAs("File Path").isEqualTo(acString.getAllValues()); - assertThat(new HashSet(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("Position in file").isEqualTo( - new HashSet(acLong.getAllValues())); - assertThat(Arrays.asList(0, 0)).describedAs("buffer offset").isEqualTo(acBufferOffset.getAllValues()); - assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("buffer length").isEqualTo(acBufferLength.getAllValues()); + AppendRequestParameters firstReqParameters = new AppendRequestParameters( + 0, 0, BUFFER_SIZE, APPEND_MODE, false); + AppendRequestParameters secondReqParameters = new AppendRequestParameters( + BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false); + verify(client, times(1)).append( + eq(PATH), any(byte[].class), refEq(firstReqParameters), any()); + verify(client, times(1)).append( + eq(PATH), any(byte[].class), refEq(secondReqParameters), any()); + // confirm there were only 2 invocations in all + verify(client, times(2)).append( + eq(PATH), any(byte[].class), any(), any()); } /** @@ -299,10 +290,11 @@ public final class TestAbfsOutputStream { AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op); + when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op); - AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, true)); + AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, + populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, true)); final byte[] b = new byte[BUFFER_SIZE]; new Random().nextBytes(b); @@ -311,22 +303,18 @@ public final class TestAbfsOutputStream { } Thread.sleep(1000); - ArgumentCaptor acString = ArgumentCaptor.forClass(String.class); - ArgumentCaptor acLong = ArgumentCaptor.forClass(Long.class); - ArgumentCaptor acBufferOffset = ArgumentCaptor.forClass(Integer.class); - ArgumentCaptor acBufferLength = ArgumentCaptor.forClass(Integer.class); - ArgumentCaptor acByteArray = ArgumentCaptor.forClass(byte[].class); - ArgumentCaptor acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class); - ArgumentCaptor acSASToken = ArgumentCaptor.forClass(String.class); - - verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(), - acSASToken.capture(), acAppendBlobAppend.capture()); - assertThat(Arrays.asList(PATH, PATH)).describedAs("File Path").isEqualTo(acString.getAllValues()); - assertThat(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE))).describedAs("File Position").isEqualTo(acLong.getAllValues()); - assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues()); - assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues()); - assertThat(Arrays.asList(true, true)).describedAs("is AppendBlob Append").isEqualTo(acAppendBlobAppend.getAllValues()); + AppendRequestParameters firstReqParameters = new AppendRequestParameters( + 0, 0, BUFFER_SIZE, APPEND_MODE, true); + AppendRequestParameters secondReqParameters = new AppendRequestParameters( + BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, true); + verify(client, times(1)).append( + eq(PATH), any(byte[].class), refEq(firstReqParameters), any()); + verify(client, times(1)).append( + eq(PATH), any(byte[].class), refEq(secondReqParameters), any()); + // confirm there were only 2 invocations in all + verify(client, times(2)).append( + eq(PATH), any(byte[].class), any(), any()); } /** @@ -337,6 +325,7 @@ public final class TestAbfsOutputStream { AbfsClient client = mock(AbfsClient.class); AbfsRestOperation op = mock(AbfsRestOperation.class); + when(op.getSasToken()).thenReturn(""); AbfsConfiguration abfsConf; final Configuration conf = new Configuration(); conf.set(accountKey1, accountValue1); @@ -344,10 +333,11 @@ public final class TestAbfsOutputStream { AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op); + when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op); - AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false)); + AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, + populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false)); final byte[] b = new byte[BUFFER_SIZE]; new Random().nextBytes(b); @@ -356,35 +346,31 @@ public final class TestAbfsOutputStream { } out.hflush(); - ArgumentCaptor acString = ArgumentCaptor.forClass(String.class); - ArgumentCaptor acLong = ArgumentCaptor.forClass(Long.class); - ArgumentCaptor acBufferOffset = ArgumentCaptor.forClass(Integer.class); - ArgumentCaptor acBufferLength = ArgumentCaptor.forClass(Integer.class); - ArgumentCaptor acByteArray = ArgumentCaptor.forClass(byte[].class); - ArgumentCaptor acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class); - ArgumentCaptor acSASToken = ArgumentCaptor.forClass(String.class); + AppendRequestParameters firstReqParameters = new AppendRequestParameters( + 0, 0, BUFFER_SIZE, APPEND_MODE, false); + AppendRequestParameters secondReqParameters = new AppendRequestParameters( + BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false); - verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(), - acSASToken.capture(), acAppendBlobAppend.capture()); - assertThat(Arrays.asList(PATH, PATH)).describedAs("File Path").isEqualTo(acString.getAllValues()); - assertThat(new HashSet(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("File Position").isEqualTo( - new HashSet(acLong.getAllValues())); - assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues()); - assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues()); + verify(client, times(1)).append( + eq(PATH), any(byte[].class), refEq(firstReqParameters), any()); + verify(client, times(1)).append( + eq(PATH), any(byte[].class), refEq(secondReqParameters), any()); + // confirm there were only 2 invocations in all + verify(client, times(2)).append( + eq(PATH), any(byte[].class), any(), any()); - ArgumentCaptor acFlushString = ArgumentCaptor.forClass(String.class); - ArgumentCaptor acFlushLong = ArgumentCaptor.forClass(Long.class); + ArgumentCaptor acFlushPath = ArgumentCaptor.forClass(String.class); + ArgumentCaptor acFlushPosition = ArgumentCaptor.forClass(Long.class); ArgumentCaptor acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class); ArgumentCaptor acFlushClose = ArgumentCaptor.forClass(Boolean.class); ArgumentCaptor acFlushSASToken = ArgumentCaptor.forClass(String.class); - verify(client, times(1)).flush(acFlushString.capture(), acFlushLong.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(), - acFlushSASToken.capture()); - assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushString.getAllValues()); - assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushLong.getAllValues()); + verify(client, times(1)).flush(acFlushPath.capture(), acFlushPosition.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(), + acFlushSASToken.capture()); + assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushPath.getAllValues()); + assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushPosition.getAllValues()); assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues()); assertThat(Arrays.asList(false)).describedAs("Close flag").isEqualTo(acFlushClose.getAllValues()); - } /** @@ -401,10 +387,11 @@ public final class TestAbfsOutputStream { abfsConf = new AbfsConfiguration(conf, accountName1); AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op); + when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op); - AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false)); + AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, + populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false)); final byte[] b = new byte[BUFFER_SIZE]; new Random().nextBytes(b); @@ -415,21 +402,17 @@ public final class TestAbfsOutputStream { out.flush(); Thread.sleep(1000); - ArgumentCaptor acString = ArgumentCaptor.forClass(String.class); - ArgumentCaptor acLong = ArgumentCaptor.forClass(Long.class); - ArgumentCaptor acBufferOffset = ArgumentCaptor.forClass(Integer.class); - ArgumentCaptor acBufferLength = ArgumentCaptor.forClass(Integer.class); - ArgumentCaptor acByteArray = ArgumentCaptor.forClass(byte[].class); - ArgumentCaptor acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class); - ArgumentCaptor acSASToken = ArgumentCaptor.forClass(String.class); - - verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(), - acSASToken.capture(), acAppendBlobAppend.capture()); - assertThat(Arrays.asList(PATH, PATH)).describedAs("path").isEqualTo(acString.getAllValues()); - assertThat(new HashSet(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("Position").isEqualTo( - new HashSet(acLong.getAllValues())); - assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues()); - assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues()); + AppendRequestParameters firstReqParameters = new AppendRequestParameters( + 0, 0, BUFFER_SIZE, APPEND_MODE, false); + AppendRequestParameters secondReqParameters = new AppendRequestParameters( + BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false); + verify(client, times(1)).append( + eq(PATH), any(byte[].class), refEq(firstReqParameters), any()); + verify(client, times(1)).append( + eq(PATH), any(byte[].class), refEq(secondReqParameters), any()); + // confirm there were only 2 invocations in all + verify(client, times(2)).append( + eq(PATH), any(byte[].class), any(), any()); } }