diff --git a/hadoop-tools/hadoop-azure/pom.xml b/hadoop-tools/hadoop-azure/pom.xml
index 7bfce0102fd..6b0599daa90 100644
--- a/hadoop-tools/hadoop-azure/pom.xml
+++ b/hadoop-tools/hadoop-azure/pom.xml
@@ -555,6 +555,7 @@
**/azurebfs/ITestAbfsReadWriteAndSeek.java**/azurebfs/ITestAzureBlobFileSystemListStatus.java**/azurebfs/extensions/ITestAbfsDelegationTokens.java
+ **/azurebfs/ITestSmallWriteOptimization.java
@@ -594,6 +595,7 @@
**/azurebfs/ITestAbfsReadWriteAndSeek.java**/azurebfs/ITestAzureBlobFileSystemListStatus.java**/azurebfs/extensions/ITestAbfsDelegationTokens.java
+ **/azurebfs/ITestSmallWriteOptimization.java
diff --git a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml
index c50236162d8..070c8c1fe82 100644
--- a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml
+++ b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml
@@ -46,4 +46,6 @@
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]AzureBlobFileSystemStore.java"/>
+
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index b1c95d2e82b..5a703233953 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -100,6 +100,10 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_WRITE_BUFFER_SIZE)
private int writeBufferSize;
+ @BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION,
+ DefaultValue = DEFAULT_AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION)
+ private boolean enableSmallWriteOptimization;
+
@BooleanConfigurationValidatorAnnotation(
ConfigurationKey = AZURE_READ_SMALL_FILES_COMPLETELY,
DefaultValue = DEFAULT_READ_SMALL_FILES_COMPLETELY)
@@ -537,6 +541,10 @@ public class AbfsConfiguration{
return this.writeBufferSize;
}
+ public boolean isSmallWriteOptimizationEnabled() {
+ return this.enableSmallWriteOptimization;
+ }
+
public boolean readSmallFilesCompletely() {
return this.readSmallFilesCompletely;
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 869a6f9907f..c8dd518b4f3 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -578,6 +578,7 @@ public class AzureBlobFileSystemStore implements Closeable {
return new AbfsOutputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds())
.withWriteBufferSize(bufferSize)
.enableFlush(abfsConfiguration.isFlushEnabled())
+ .enableSmallWriteOptimization(abfsConfiguration.isSmallWriteOptimizationEnabled())
.disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
.withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
.withAppendBlob(isAppendBlob)
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
index 38b79c9412f..184657e7d66 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
@@ -76,6 +76,7 @@ public final class AbfsHttpConstants {
public static final String AT = "@";
public static final String HTTP_HEADER_PREFIX = "x-ms-";
public static final String HASH = "#";
+ public static final String TRUE = "true";
public static final String PLUS_ENCODE = "%20";
public static final String FORWARD_SLASH_ENCODE = "%2F";
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 3e1ff80e7ef..cdef9c9b7ac 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -55,6 +55,15 @@ public final class ConfigurationKeys {
public static final String AZURE_WRITE_MAX_CONCURRENT_REQUESTS = "fs.azure.write.max.concurrent.requests";
public static final String AZURE_WRITE_MAX_REQUESTS_TO_QUEUE = "fs.azure.write.max.requests.to.queue";
public static final String AZURE_WRITE_BUFFER_SIZE = "fs.azure.write.request.size";
+ /** If the data size written by Hadoop app is small, i.e. data size :
+ * (a) before any of HFlush/HSync call is made or
+ * (b) between 2 HFlush/Hsync API calls
+ * is less than write buffer size, 2 separate calls, one for append and
+ * another for flush are made.
+ * By enabling the small write optimization, a single call will be made to
+ * perform both append and flush operations and hence reduce request count.
+ */
+ public static final String AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION = "fs.azure.write.enableappendwithflush";
public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size";
public static final String AZURE_READ_SMALL_FILES_COMPLETELY = "fs.azure.read.smallfilescompletely";
public static final String AZURE_READ_OPTIMIZE_FOOTER_READ = "fs.azure.read.optimizefooterread";
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index 80082063f6e..a23dfd5292b 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -56,6 +56,7 @@ public final class FileSystemConfigurations {
// Default upload and download buffer size
public static final int DEFAULT_WRITE_BUFFER_SIZE = 8 * ONE_MB; // 8 MB
public static final int APPENDBLOB_MAX_WRITE_BUFFER_SIZE = 4 * ONE_MB; // 4 MB
+ public static final boolean DEFAULT_AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION = false;
public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB; // 4 MB
public static final boolean DEFAULT_READ_SMALL_FILES_COMPLETELY = false;
public static final boolean DEFAULT_OPTIMIZE_FOOTER_READ = false;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java
index 5a550ac783f..8a4ca90f358 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java
@@ -36,6 +36,7 @@ public final class HttpQueryParams {
public static final String QUERY_PARAM_POSITION = "position";
public static final String QUERY_PARAM_TIMEOUT = "timeout";
public static final String QUERY_PARAM_RETAIN_UNCOMMITTED_DATA = "retainUncommittedData";
+ public static final String QUERY_PARAM_FLUSH = "flush";
public static final String QUERY_PARAM_CLOSE = "close";
public static final String QUERY_PARAM_UPN = "upn";
public static final String QUERY_PARAM_BLOBTYPE = "blobtype";
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java
new file mode 100644
index 00000000000..fb4d29f8794
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.contracts.services;
+
+/**
+ * Saves the different request parameters for append
+ */
+public class AppendRequestParameters {
+ public enum Mode {
+ APPEND_MODE,
+ FLUSH_MODE,
+ FLUSH_CLOSE_MODE
+ }
+
+ private final long position;
+ private final int offset;
+ private final int length;
+ private final Mode mode;
+ private final boolean isAppendBlob;
+
+ public AppendRequestParameters(final long position,
+ final int offset,
+ final int length,
+ final Mode mode,
+ final boolean isAppendBlob) {
+ this.position = position;
+ this.offset = offset;
+ this.length = length;
+ this.mode = mode;
+ this.isAppendBlob = isAppendBlob;
+ }
+
+ public long getPosition() {
+ return this.position;
+ }
+
+ public int getoffset() {
+ return this.offset;
+ }
+
+ public int getLength() {
+ return this.length;
+ }
+
+ public Mode getMode() {
+ return this.mode;
+ }
+
+ public boolean isAppendBlob() {
+ return this.isAppendBlob;
+ }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index db2f44f3bb4..bfc11a676ae 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderExcept
import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper;
import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
import org.apache.hadoop.io.IOUtils;
@@ -396,17 +397,27 @@ public class AbfsClient implements Closeable {
return op;
}
- public AbfsRestOperation append(final String path, final long position, final byte[] buffer, final int offset,
- final int length, final String cachedSasToken, final boolean isAppendBlob) throws AzureBlobFileSystemException {
+ public AbfsRestOperation append(final String path, final byte[] buffer,
+ AppendRequestParameters reqParams, final String cachedSasToken)
+ throws AzureBlobFileSystemException {
final List requestHeaders = createDefaultHeaders();
// JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header.
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
- HTTP_METHOD_PATCH));
+ HTTP_METHOD_PATCH));
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
- abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position));
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(reqParams.getPosition()));
+
+ if ((reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_MODE) || (
+ reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_CLOSE_MODE)) {
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_FLUSH, TRUE);
+ if (reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_CLOSE_MODE) {
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, TRUE);
+ }
+ }
+
// AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION,
abfsUriQueryBuilder, cachedSasToken);
@@ -414,20 +425,30 @@ public class AbfsClient implements Closeable {
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.Append,
- this,
- HTTP_METHOD_PUT,
- url,
- requestHeaders, buffer, offset, length, sasTokenForReuse);
+ this,
+ HTTP_METHOD_PUT,
+ url,
+ requestHeaders,
+ buffer,
+ reqParams.getoffset(),
+ reqParams.getLength(),
+ sasTokenForReuse);
try {
op.execute();
} catch (AzureBlobFileSystemException e) {
- if (isAppendBlob && appendSuccessCheckOp(op, path, (position + length))) {
+ if (reqParams.isAppendBlob()
+ && appendSuccessCheckOp(op, path,
+ (reqParams.getPosition() + reqParams.getLength()))) {
final AbfsRestOperation successOp = new AbfsRestOperation(
AbfsRestOperationType.Append,
- this,
- HTTP_METHOD_PUT,
- url,
- requestHeaders, buffer, offset, length, sasTokenForReuse);
+ this,
+ HTTP_METHOD_PUT,
+ url,
+ requestHeaders,
+ buffer,
+ reqParams.getoffset(),
+ reqParams.getLength(),
+ sasTokenForReuse);
successOp.hardSetResult(HttpURLConnection.HTTP_OK);
return successOp;
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index 01b2fa5dede..402fdda7b25 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -35,11 +35,13 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.fs.FileSystem.Statistics;
@@ -48,6 +50,9 @@ import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import static org.apache.hadoop.io.IOUtils.wrapException;
+import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE;
+import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.FLUSH_CLOSE_MODE;
+import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.FLUSH_MODE;
/**
* The BlobFsOutputStream for Rest AbfsClient.
@@ -60,6 +65,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
private boolean closed;
private boolean supportFlush;
private boolean disableOutputStreamFlush;
+ private boolean enableSmallWriteOptimization;
private boolean isAppendBlob;
private volatile IOException lastError;
@@ -69,6 +75,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
private final int bufferSize;
private byte[] buffer;
private int bufferIndex;
+ private int numOfAppendsToServerSinceLastFlush;
private final int maxConcurrentRequestCount;
private final int maxRequestsThatCanBeQueued;
@@ -108,12 +115,15 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
this.supportFlush = abfsOutputStreamContext.isEnableFlush();
this.disableOutputStreamFlush = abfsOutputStreamContext
.isDisableOutputStreamFlush();
+ this.enableSmallWriteOptimization
+ = abfsOutputStreamContext.isEnableSmallWriteOptimization();
this.isAppendBlob = abfsOutputStreamContext.isAppendBlob();
this.lastError = null;
this.lastFlushOffset = 0;
this.bufferSize = abfsOutputStreamContext.getWriteBufferSize();
this.buffer = byteBufferPool.getBuffer(false, bufferSize).array();
this.bufferIndex = 0;
+ this.numOfAppendsToServerSinceLastFlush = 0;
this.writeOperations = new ConcurrentLinkedDeque<>();
this.outputStreamStatistics = abfsOutputStreamContext.getStreamStatistics();
@@ -309,8 +319,29 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
private synchronized void flushInternal(boolean isClose) throws IOException {
maybeThrowLastError();
+
+ // if its a flush post write < buffersize, send flush parameter in append
+ if (!isAppendBlob
+ && enableSmallWriteOptimization
+ && (numOfAppendsToServerSinceLastFlush == 0) // there are no ongoing store writes
+ && (writeOperations.size() == 0) // double checking no appends in progress
+ && (bufferIndex > 0)) { // there is some data that is pending to be written
+ smallWriteOptimizedflushInternal(isClose);
+ return;
+ }
+
writeCurrentBufferToService();
flushWrittenBytesToService(isClose);
+ numOfAppendsToServerSinceLastFlush = 0;
+ }
+
+ private synchronized void smallWriteOptimizedflushInternal(boolean isClose) throws IOException {
+ // writeCurrentBufferToService will increment numOfAppendsToServerSinceLastFlush
+ writeCurrentBufferToService(true, isClose);
+ waitForAppendsToComplete();
+ shrinkWriteOperationQueue();
+ maybeThrowLastError();
+ numOfAppendsToServerSinceLastFlush = 0;
}
private synchronized void flushInternalAsync() throws IOException {
@@ -335,8 +366,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
"writeCurrentBufferToService", "append")) {
- AbfsRestOperation op = client.append(path, offset, bytes, 0,
- bytesLength, cachedSasToken.get(), this.isAppendBlob);
+ AppendRequestParameters reqParams = new AppendRequestParameters(offset, 0,
+ bytesLength, APPEND_MODE, true);
+ AbfsRestOperation op = client.append(path, bytes, reqParams, cachedSasToken.get());
cachedSasToken.update(op.getSasToken());
outputStreamStatistics.uploadSuccessful(bytesLength);
perfInfo.registerResult(op.getResult());
@@ -358,6 +390,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
}
private synchronized void writeCurrentBufferToService() throws IOException {
+ writeCurrentBufferToService(false, false);
+ }
+
+ private synchronized void writeCurrentBufferToService(boolean isFlush, boolean isClose) throws IOException {
if (this.isAppendBlob) {
writeAppendBlobCurrentBufferToService();
return;
@@ -367,6 +403,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
return;
}
outputStreamStatistics.writeCurrentBuffer();
+ numOfAppendsToServerSinceLastFlush++;
final byte[] bytes = buffer;
final int bytesLength = bufferIndex;
@@ -388,8 +425,19 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
"writeCurrentBufferToService", "append")) {
- AbfsRestOperation op = client.append(path, offset, bytes, 0,
- bytesLength, cachedSasToken.get(), false);
+ AppendRequestParameters.Mode
+ mode = APPEND_MODE;
+ if (isFlush & isClose) {
+ mode = FLUSH_CLOSE_MODE;
+ } else if (isFlush) {
+ mode = FLUSH_MODE;
+ }
+
+ AppendRequestParameters reqParams = new AppendRequestParameters(
+ offset, 0, bytesLength, mode, false);
+ AbfsRestOperation op = client.append(path, bytes, reqParams,
+ cachedSasToken.get());
+
cachedSasToken.update(op.getSasToken());
perfInfo.registerResult(op.getResult());
byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
@@ -410,7 +458,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
shrinkWriteOperationQueue();
}
- private synchronized void flushWrittenBytesToService(boolean isClose) throws IOException {
+ private synchronized void waitForAppendsToComplete() throws IOException {
for (WriteOperation writeOperation : writeOperations) {
try {
writeOperation.task.get();
@@ -428,6 +476,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
throw lastError;
}
}
+ }
+
+ private synchronized void flushWrittenBytesToService(boolean isClose) throws IOException {
+ waitForAppendsToComplete();
flushWrittenBytesToServiceInternal(position, false, isClose);
}
@@ -558,6 +610,11 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
return maxRequestsThatCanBeQueued;
}
+ @VisibleForTesting
+ Boolean isAppendBlobStream() {
+ return isAppendBlob;
+ }
+
/**
* Appending AbfsOutputStream statistics to base toString().
*
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
index 2dce5dc2c77..925cd4f7b56 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
@@ -27,6 +27,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
private boolean enableFlush;
+ private boolean enableSmallWriteOptimization;
+
private boolean disableOutputStreamFlush;
private AbfsOutputStreamStatistics streamStatistics;
@@ -52,6 +54,11 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
return this;
}
+ public AbfsOutputStreamContext enableSmallWriteOptimization(final boolean enableSmallWriteOptimization) {
+ this.enableSmallWriteOptimization = enableSmallWriteOptimization;
+ return this;
+ }
+
public AbfsOutputStreamContext disableOutputStreamFlush(
final boolean disableOutputStreamFlush) {
this.disableOutputStreamFlush = disableOutputStreamFlush;
@@ -114,4 +121,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
public int getMaxWriteRequestsToQueue() {
return this.maxWriteRequestsToQueue;
}
+
+ public boolean isEnableSmallWriteOptimization() {
+ return this.enableSmallWriteOptimization;
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
index 83c76f5a6ab..24ec2926647 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
@@ -290,7 +290,7 @@ public class AbfsRestOperation {
AbfsClientThrottlingIntercept.updateMetrics(operationType, httpOperation);
}
- LOG.debug("HttpRequest: {}", httpOperation.toString());
+ LOG.debug("HttpRequest: {}: {}", operationType, httpOperation.toString());
if (client.getRetryPolicy().shouldRetry(retryCount, httpOperation.getStatusCode())) {
return false;
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
index c2dbe937b81..66b8da89572 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
@@ -33,14 +33,16 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.BYTES_RECEIVED;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.GET_RESPONSES;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.SEND_REQUESTS;
public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
private static final Logger LOG =
LoggerFactory.getLogger(ITestAbfsNetworkStatistics.class);
- private static final int LARGE_OPERATIONS = 10;
+ private static final int WRITE_OPERATION_LOOP_COUNT = 10;
public ITestAbfsNetworkStatistics() throws Exception {
}
@@ -58,117 +60,126 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
Map metricMap;
Path sendRequestPath = path(getMethodName());
String testNetworkStatsString = "http_send";
- long connectionsMade, requestsSent, bytesSent;
metricMap = fs.getInstrumentationMap();
- long connectionsMadeBeforeTest = metricMap
- .get(CONNECTIONS_MADE.getStatName());
- long requestsMadeBeforeTest = metricMap.get(SEND_REQUESTS.getStatName());
+ long expectedConnectionsMade = metricMap.get(CONNECTIONS_MADE.getStatName());
+ long expectedRequestsSent = metricMap.get(SEND_REQUESTS.getStatName());
+ long expectedBytesSent = 0;
- /*
- * Creating AbfsOutputStream will result in 1 connection made and 1 send
- * request.
- */
+ // --------------------------------------------------------------------
+ // Operation: Creating AbfsOutputStream
try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
sendRequestPath)) {
+ // Network stats calculation: For Creating AbfsOutputStream:
+ // 1 create request = 1 connection made and 1 send request
+ expectedConnectionsMade++;
+ expectedRequestsSent++;
+ // --------------------------------------------------------------------
+
+ // Operation: Write small data
+ // Network stats calculation: No additions.
+ // Data written is less than the buffer size and hence will not
+ // trigger any append request to store
out.write(testNetworkStatsString.getBytes());
+ // --------------------------------------------------------------------
- /*
- * Flushes all outstanding data (i.e. the current unfinished packet)
- * from the client into the service on all DataNode replicas.
- */
+ // Operation: HFlush
+ // Flushes all outstanding data (i.e. the current unfinished packet)
+ // from the client into the service on all DataNode replicas.
out.hflush();
-
- metricMap = fs.getInstrumentationMap();
-
/*
- * Testing the network stats with 1 write operation.
+ * Network stats calculation:
+ * 3 possibilities here:
+ * A. As there is pending data to be written to store, this will result in:
+ * 1 append + 1 flush = 2 connections and 2 send requests
*
- * connections_made : (connections made above) + 2(flush).
+ * B. If config "fs.azure.enable.small.write.optimization" is enabled, append
+ * and flush call will be merged for small data in buffer in this test.
+ * In which case it will be:
+ * 1 append+flush request = 1 connection and 1 send request
*
- * send_requests : (requests sent above) + 2(flush).
- *
- * bytes_sent : bytes wrote in AbfsOutputStream.
+ * C. If the path is configured for append Blob files to be used, hflush
+ * is a no-op. So in this case:
+ * 1 append = 1 connection and 1 send request
*/
- long extraCalls = 0;
- if (!fs.getAbfsStore()
- .isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())) {
- // no network calls are made for hflush in case of appendblob
- extraCalls++;
+ if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())
+ || (this.getConfiguration().isSmallWriteOptimizationEnabled())) {
+ expectedConnectionsMade++;
+ expectedRequestsSent++;
+ } else {
+ expectedConnectionsMade += 2;
+ expectedRequestsSent += 2;
}
- long expectedConnectionsMade = connectionsMadeBeforeTest + extraCalls + 2;
- long expectedRequestsSent = requestsMadeBeforeTest + extraCalls + 2;
- connectionsMade = assertAbfsStatistics(CONNECTIONS_MADE,
+ expectedBytesSent += testNetworkStatsString.getBytes().length;
+ // --------------------------------------------------------------------
+
+ // Assertions
+ metricMap = fs.getInstrumentationMap();
+ assertAbfsStatistics(CONNECTIONS_MADE,
expectedConnectionsMade, metricMap);
- requestsSent = assertAbfsStatistics(SEND_REQUESTS, expectedRequestsSent,
+ assertAbfsStatistics(SEND_REQUESTS, expectedRequestsSent,
metricMap);
- bytesSent = assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
- testNetworkStatsString.getBytes().length, metricMap);
+ assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
+ expectedBytesSent, metricMap);
}
- // To close the AbfsOutputStream 1 connection is made and 1 request is sent.
- connectionsMade++;
- requestsSent++;
-
+ // --------------------------------------------------------------------
+ // Operation: AbfsOutputStream close.
+ // Network Stats calculation: 1 flush (with close) is send.
+ // 1 flush request = 1 connection and 1 send request
+ expectedConnectionsMade++;
+ expectedRequestsSent++;
+ // --------------------------------------------------------------------
+ // Operation: Re-create the file / create overwrite scenario
try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
sendRequestPath)) {
-
- // Is a file overwrite case
- long createRequestCalls = 1;
- long createTriggeredGFSForETag = 0;
- if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) {
- createRequestCalls += 1;
- createTriggeredGFSForETag = 1;
- }
-
- for (int i = 0; i < LARGE_OPERATIONS; i++) {
- out.write(testNetworkStatsString.getBytes());
-
- /*
- * 1 flush call would create 2 connections and 2 send requests.
- * when hflush() is called it will essentially trigger append() and
- * flush() inside AbfsRestOperation. Both of which calls
- * executeHttpOperation() method which creates a connection and sends
- * requests.
- */
- out.hflush();
- }
-
- metricMap = fs.getInstrumentationMap();
-
/*
- * Testing the network stats with Large amount of bytes sent.
- *
- * connections made : connections_made(Last assertion) + 1
- * (AbfsOutputStream) + LARGE_OPERATIONS * 2(flush).
- *
- * send requests : requests_sent(Last assertion) + 1(AbfsOutputStream) +
- * LARGE_OPERATIONS * 2(flush).
- *
- * bytes sent : bytes_sent(Last assertion) + LARGE_OPERATIONS * (bytes
- * wrote each time).
+ * Network Stats calculation: create overwrite
+ * There are 2 possibilities here.
+ * A. create overwrite results in 1 server call
+ * create with overwrite=true = 1 connection and 1 send request
*
+ * B. If config "fs.azure.enable.conditional.create.overwrite" is enabled,
+ * create overwrite=false (will fail in this case as file is indeed present)
+ * + getFileStatus to fetch the file ETag
+ * + create overwrite=true
+ * = 3 connections and 2 send requests
*/
-
- connectionsMade += createRequestCalls + createTriggeredGFSForETag;
- requestsSent += createRequestCalls;
- if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())) {
- // no network calls are made for hflush in case of appendblob
- assertAbfsStatistics(CONNECTIONS_MADE,
- connectionsMade + LARGE_OPERATIONS, metricMap);
- assertAbfsStatistics(SEND_REQUESTS,
- requestsSent + LARGE_OPERATIONS, metricMap);
+ if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) {
+ expectedConnectionsMade += 3;
+ expectedRequestsSent += 2;
} else {
- assertAbfsStatistics(CONNECTIONS_MADE,
- connectionsMade + LARGE_OPERATIONS * 2, metricMap);
- assertAbfsStatistics(SEND_REQUESTS,
- requestsSent + LARGE_OPERATIONS * 2, metricMap);
+ expectedConnectionsMade += 1;
+ expectedRequestsSent += 1;
}
- assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
- bytesSent + LARGE_OPERATIONS * (testNetworkStatsString.getBytes().length),
- metricMap);
+ // --------------------------------------------------------------------
+ // Operation: Multiple small appends + hflush
+ for (int i = 0; i < WRITE_OPERATION_LOOP_COUNT; i++) {
+ out.write(testNetworkStatsString.getBytes());
+ // Network stats calculation: no-op. Small write
+ out.hflush();
+ // Network stats calculation: Hflush
+ // refer to previous comments for hFlush network stats calcualtion
+ // possibilities
+ if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())
+ || (this.getConfiguration().isSmallWriteOptimizationEnabled())) {
+ expectedConnectionsMade++;
+ expectedRequestsSent++;
+ } else {
+ expectedConnectionsMade += 2;
+ expectedRequestsSent += 2;
+ }
+ expectedBytesSent += testNetworkStatsString.getBytes().length;
+ }
+ // --------------------------------------------------------------------
+
+ // Assertions
+ metricMap = fs.getInstrumentationMap();
+ assertAbfsStatistics(CONNECTIONS_MADE, expectedConnectionsMade, metricMap);
+ assertAbfsStatistics(SEND_REQUESTS, expectedRequestsSent, metricMap);
+ assertAbfsStatistics(AbfsStatistic.BYTES_SENT, expectedBytesSent, metricMap);
}
}
@@ -185,130 +196,100 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
Path getResponsePath = path(getMethodName());
Map metricMap;
String testResponseString = "some response";
- long getResponses, bytesReceived;
FSDataOutputStream out = null;
FSDataInputStream in = null;
- try {
+ long expectedConnectionsMade;
+ long expectedGetResponses;
+ long expectedBytesReceived;
- /*
- * Creating a File and writing some bytes in it.
- *
- * get_response : 3(getFileSystem) + 1(OutputStream creation) + 2
- * (Writing data in Data store).
- *
- */
+ try {
+ // Creating a File and writing some bytes in it.
out = fs.create(getResponsePath);
out.write(testResponseString.getBytes());
out.hflush();
+ // Set metric baseline
metricMap = fs.getInstrumentationMap();
- long getResponsesBeforeTest = metricMap
- .get(CONNECTIONS_MADE.getStatName());
+ long bytesWrittenToFile = testResponseString.getBytes().length;
+ expectedConnectionsMade = metricMap.get(CONNECTIONS_MADE.getStatName());
+ expectedGetResponses = metricMap.get(CONNECTIONS_MADE.getStatName());
+ expectedBytesReceived = metricMap.get(BYTES_RECEIVED.getStatName());
- // open would require 1 get response.
+ // --------------------------------------------------------------------
+ // Operation: Create AbfsInputStream
in = fs.open(getResponsePath);
- // read would require 1 get response and also get the bytes received.
+ // Network stats calculation: For Creating AbfsInputStream:
+ // 1 GetFileStatus request to fetch file size = 1 connection and 1 get response
+ expectedConnectionsMade++;
+ expectedGetResponses++;
+ // --------------------------------------------------------------------
+
+ // Operation: Read
int result = in.read();
+ // Network stats calculation: For read:
+ // 1 read request = 1 connection and 1 get response
+ expectedConnectionsMade++;
+ expectedGetResponses++;
+ expectedBytesReceived += bytesWrittenToFile;
+ // --------------------------------------------------------------------
- // Confirming read isn't -1.
- LOG.info("Result of read operation : {}", result);
-
+ // Assertions
metricMap = fs.getInstrumentationMap();
-
- /*
- * Testing values of statistics after writing and reading a buffer.
- *
- * get_responses - (above operations) + 1(open()) + 1 (read()).;
- *
- * bytes_received - This should be equal to bytes sent earlier.
- */
- long extraCalls = 0;
- if (!fs.getAbfsStore()
- .isAppendBlobKey(fs.makeQualified(getResponsePath).toString())) {
- // no network calls are made for hflush in case of appendblob
- extraCalls++;
- }
- long expectedGetResponses = getResponsesBeforeTest + extraCalls + 1;
- getResponses = assertAbfsStatistics(AbfsStatistic.GET_RESPONSES,
- expectedGetResponses, metricMap);
-
- // Testing that bytes received is equal to bytes sent.
- long bytesSend = metricMap.get(AbfsStatistic.BYTES_SENT.getStatName());
- bytesReceived = assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED,
- bytesSend,
- metricMap);
-
+ assertAbfsStatistics(CONNECTIONS_MADE, expectedConnectionsMade, metricMap);
+ assertAbfsStatistics(GET_RESPONSES, expectedGetResponses, metricMap);
+ assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED, expectedBytesReceived, metricMap);
} finally {
IOUtils.cleanupWithLogger(LOG, out, in);
}
- // To close the streams 1 response is received.
- getResponses++;
+ // --------------------------------------------------------------------
+ // Operation: AbfsOutputStream close.
+ // Network Stats calculation: no op.
+ // --------------------------------------------------------------------
try {
- /*
- * Creating a file and writing buffer into it.
- * This is a file recreate, so it will trigger
- * 2 extra calls if create overwrite is off by default.
- * Also recording the buffer for future read() call.
- * This creating outputStream and writing requires 2 *
- * (LARGE_OPERATIONS) get requests.
- */
+ // Recreate file with different file size
+ // [Create and append related network stats checks are done in
+ // test method testAbfsHttpSendStatistics]
StringBuilder largeBuffer = new StringBuilder();
out = fs.create(getResponsePath);
- long createRequestCalls = 1;
- if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) {
- createRequestCalls += 2;
- }
-
- for (int i = 0; i < LARGE_OPERATIONS; i++) {
+ for (int i = 0; i < WRITE_OPERATION_LOOP_COUNT; i++) {
out.write(testResponseString.getBytes());
out.hflush();
largeBuffer.append(testResponseString);
}
- // Open requires 1 get_response.
- in = fs.open(getResponsePath);
-
- /*
- * Reading the file which was written above. This read() call would
- * read bytes equal to the bytes that was written above.
- * Get response would be 1 only.
- */
- in.read(0, largeBuffer.toString().getBytes(), 0,
- largeBuffer.toString().getBytes().length);
-
+ // sync back to metric baseline
metricMap = fs.getInstrumentationMap();
+ expectedConnectionsMade = metricMap.get(CONNECTIONS_MADE.getStatName());
+ expectedGetResponses = metricMap.get(GET_RESPONSES.getStatName());
+ // --------------------------------------------------------------------
+ // Operation: Create AbfsInputStream
+ in = fs.open(getResponsePath);
+ // Network stats calculation: For Creating AbfsInputStream:
+ // 1 GetFileStatus for file size = 1 connection and 1 get response
+ expectedConnectionsMade++;
+ expectedGetResponses++;
+ // --------------------------------------------------------------------
- /*
- * Testing the statistics values after writing and reading a large buffer.
- *
- * get_response : get_responses(Last assertion) + 1
- * (OutputStream) + 2 * LARGE_OPERATIONS(Writing and flushing
- * LARGE_OPERATIONS times) + 1(open()) + 1(read()) +
- * 1 (createOverwriteTriggeredGetForeTag).
- *
- * bytes_received : bytes_received(Last assertion) + LARGE_OPERATIONS *
- * bytes wrote each time (bytes_received is equal to bytes wrote in the
- * File).
- *
- */
- assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED,
- bytesReceived + LARGE_OPERATIONS * (testResponseString.getBytes().length),
- metricMap);
- if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(getResponsePath).toString())) {
- // no network calls are made for hflush in case of appendblob
- assertAbfsStatistics(AbfsStatistic.GET_RESPONSES,
- getResponses + 3 + LARGE_OPERATIONS, metricMap);
- } else {
- assertAbfsStatistics(AbfsStatistic.GET_RESPONSES,
- getResponses + 2 + createRequestCalls + 2 * LARGE_OPERATIONS,
- metricMap);
- }
+ // Operation: Read
+ in.read(0, largeBuffer.toString().getBytes(), 0, largeBuffer.toString().getBytes().length);
+ // Network stats calculation: Total data written is still lesser than
+ // a buffer size. Hence will trigger only one read to store. So result is:
+ // 1 read request = 1 connection and 1 get response
+ expectedConnectionsMade++;
+ expectedGetResponses++;
+ expectedBytesReceived += (WRITE_OPERATION_LOOP_COUNT * testResponseString.getBytes().length);
+ // --------------------------------------------------------------------
+ // Assertions
+ metricMap = fs.getInstrumentationMap();
+ assertAbfsStatistics(CONNECTIONS_MADE, expectedConnectionsMade, metricMap);
+ assertAbfsStatistics(GET_RESPONSES, expectedGetResponses, metricMap);
+ assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED, expectedBytesReceived, metricMap);
} finally {
IOUtils.cleanupWithLogger(LOG, out, in);
}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestSmallWriteOptimization.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestSmallWriteOptimization.java
new file mode 100644
index 00000000000..fce2b682f58
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestSmallWriteOptimization.java
@@ -0,0 +1,523 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.util.Arrays;
+import java.util.Random;
+import java.util.UUID;
+import java.util.Map;
+import java.io.IOException;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Assume;
+import org.junit.runners.Parameterized;
+import org.junit.runner.RunWith;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
+
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.BYTES_SENT;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.SEND_REQUESTS;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_APPENDBLOB_ENABLED;
+
+/**
+ * Test combination for small writes with flush and close operations.
+ * This test class formulates an append test flow to assert on various scenarios.
+ * Test stages:
+ * 1. Pre-create test file of required size. This is determined by
+ * startingFileSize parameter. If it is 0, then pre-creation is skipped.
+ *
+ * 2. Formulate an append loop or iteration. An iteration, will do N writes
+ * (determined by numOfClientWrites parameter) with each writing X bytes
+ * (determined by recurringClientWriteSize parameter).
+ *
+ * 3. Determine total number of append iterations needed by a test.
+ * If intention is to close the outputStream right after append, setting
+ * directCloseTest parameter will determine 1 append test iteration with an
+ * ending close.
+ * Else, it will execute TEST_FLUSH_ITERATION number of test iterations, with
+ * each doing appends, hflush/hsync and then close.
+ *
+ * 4. Execute test iterations with asserts on number of store requests made and
+ * validating file content.
+ */
+@RunWith(Parameterized.class)
+public class ITestSmallWriteOptimization extends AbstractAbfsScaleTest {
+ private static final int ONE_MB = 1024 * 1024;
+ private static final int TWO_MB = 2 * ONE_MB;
+ private static final int TEST_BUFFER_SIZE = TWO_MB;
+ private static final int HALF_TEST_BUFFER_SIZE = TWO_MB / 2;
+ private static final int QUARTER_TEST_BUFFER_SIZE = TWO_MB / 4;
+ private static final int TEST_FLUSH_ITERATION = 2;
+
+ @Parameterized.Parameter
+ public String testScenario;
+
+ @Parameterized.Parameter(1)
+ public boolean enableSmallWriteOptimization;
+
+ /**
+ * If true, will initiate close after appends. (That is, no explicit hflush or
+ * hsync calls will be made from client app.)
+ */
+ @Parameterized.Parameter(2)
+ public boolean directCloseTest;
+
+ /**
+ * If non-zero, test file should be created as pre-requisite with this size.
+ */
+ @Parameterized.Parameter(3)
+ public Integer startingFileSize;
+
+ /**
+ * Determines the write sizes to be issued by client app.
+ */
+ @Parameterized.Parameter(4)
+ public Integer recurringClientWriteSize;
+
+ /**
+ * Determines the number of Client writes to make.
+ */
+ @Parameterized.Parameter(5)
+ public Integer numOfClientWrites;
+
+ /**
+ * True, if the small write optimization is supposed to be effective in
+ * the scenario.
+ */
+ @Parameterized.Parameter(6)
+ public boolean flushExpectedToBeMergedWithAppend;
+
+ @Parameterized.Parameters(name = "{0}")
+ public static Iterable