HADOOP-17404. ABFS: Small write - Merge append and flush
- Contributed by Sneha Vijayarajan
This commit is contained in:
parent
d21c1c6576
commit
b612c310c2
|
@ -555,6 +555,7 @@
|
||||||
<exclude>**/azurebfs/ITestAbfsReadWriteAndSeek.java</exclude>
|
<exclude>**/azurebfs/ITestAbfsReadWriteAndSeek.java</exclude>
|
||||||
<exclude>**/azurebfs/ITestAzureBlobFileSystemListStatus.java</exclude>
|
<exclude>**/azurebfs/ITestAzureBlobFileSystemListStatus.java</exclude>
|
||||||
<exclude>**/azurebfs/extensions/ITestAbfsDelegationTokens.java</exclude>
|
<exclude>**/azurebfs/extensions/ITestAbfsDelegationTokens.java</exclude>
|
||||||
|
<exclude>**/azurebfs/ITestSmallWriteOptimization.java</exclude>
|
||||||
</excludes>
|
</excludes>
|
||||||
|
|
||||||
</configuration>
|
</configuration>
|
||||||
|
@ -594,6 +595,7 @@
|
||||||
<include>**/azurebfs/ITestAbfsReadWriteAndSeek.java</include>
|
<include>**/azurebfs/ITestAbfsReadWriteAndSeek.java</include>
|
||||||
<include>**/azurebfs/ITestAzureBlobFileSystemListStatus.java</include>
|
<include>**/azurebfs/ITestAzureBlobFileSystemListStatus.java</include>
|
||||||
<include>**/azurebfs/extensions/ITestAbfsDelegationTokens.java</include>
|
<include>**/azurebfs/extensions/ITestAbfsDelegationTokens.java</include>
|
||||||
|
<include>**/azurebfs/ITestSmallWriteOptimization.java</include>
|
||||||
</includes>
|
</includes>
|
||||||
</configuration>
|
</configuration>
|
||||||
</execution>
|
</execution>
|
||||||
|
|
|
@ -46,4 +46,6 @@
|
||||||
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]AzureBlobFileSystemStore.java"/>
|
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]AzureBlobFileSystemStore.java"/>
|
||||||
<suppress checks="ParameterNumber|MagicNumber"
|
<suppress checks="ParameterNumber|MagicNumber"
|
||||||
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]utils[\\/]Base64.java"/>
|
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]utils[\\/]Base64.java"/>
|
||||||
|
<suppress checks="ParameterNumber|VisibilityModifier"
|
||||||
|
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]ITestSmallWriteOptimization.java"/>
|
||||||
</suppressions>
|
</suppressions>
|
||||||
|
|
|
@ -100,6 +100,10 @@ public class AbfsConfiguration{
|
||||||
DefaultValue = DEFAULT_WRITE_BUFFER_SIZE)
|
DefaultValue = DEFAULT_WRITE_BUFFER_SIZE)
|
||||||
private int writeBufferSize;
|
private int writeBufferSize;
|
||||||
|
|
||||||
|
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION,
|
||||||
|
DefaultValue = DEFAULT_AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION)
|
||||||
|
private boolean enableSmallWriteOptimization;
|
||||||
|
|
||||||
@BooleanConfigurationValidatorAnnotation(
|
@BooleanConfigurationValidatorAnnotation(
|
||||||
ConfigurationKey = AZURE_READ_SMALL_FILES_COMPLETELY,
|
ConfigurationKey = AZURE_READ_SMALL_FILES_COMPLETELY,
|
||||||
DefaultValue = DEFAULT_READ_SMALL_FILES_COMPLETELY)
|
DefaultValue = DEFAULT_READ_SMALL_FILES_COMPLETELY)
|
||||||
|
@ -537,6 +541,10 @@ public class AbfsConfiguration{
|
||||||
return this.writeBufferSize;
|
return this.writeBufferSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isSmallWriteOptimizationEnabled() {
|
||||||
|
return this.enableSmallWriteOptimization;
|
||||||
|
}
|
||||||
|
|
||||||
public boolean readSmallFilesCompletely() {
|
public boolean readSmallFilesCompletely() {
|
||||||
return this.readSmallFilesCompletely;
|
return this.readSmallFilesCompletely;
|
||||||
}
|
}
|
||||||
|
|
|
@ -578,6 +578,7 @@ public class AzureBlobFileSystemStore implements Closeable {
|
||||||
return new AbfsOutputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds())
|
return new AbfsOutputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds())
|
||||||
.withWriteBufferSize(bufferSize)
|
.withWriteBufferSize(bufferSize)
|
||||||
.enableFlush(abfsConfiguration.isFlushEnabled())
|
.enableFlush(abfsConfiguration.isFlushEnabled())
|
||||||
|
.enableSmallWriteOptimization(abfsConfiguration.isSmallWriteOptimizationEnabled())
|
||||||
.disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
|
.disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
|
||||||
.withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
|
.withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
|
||||||
.withAppendBlob(isAppendBlob)
|
.withAppendBlob(isAppendBlob)
|
||||||
|
|
|
@ -76,6 +76,7 @@ public final class AbfsHttpConstants {
|
||||||
public static final String AT = "@";
|
public static final String AT = "@";
|
||||||
public static final String HTTP_HEADER_PREFIX = "x-ms-";
|
public static final String HTTP_HEADER_PREFIX = "x-ms-";
|
||||||
public static final String HASH = "#";
|
public static final String HASH = "#";
|
||||||
|
public static final String TRUE = "true";
|
||||||
|
|
||||||
public static final String PLUS_ENCODE = "%20";
|
public static final String PLUS_ENCODE = "%20";
|
||||||
public static final String FORWARD_SLASH_ENCODE = "%2F";
|
public static final String FORWARD_SLASH_ENCODE = "%2F";
|
||||||
|
|
|
@ -55,6 +55,15 @@ public final class ConfigurationKeys {
|
||||||
public static final String AZURE_WRITE_MAX_CONCURRENT_REQUESTS = "fs.azure.write.max.concurrent.requests";
|
public static final String AZURE_WRITE_MAX_CONCURRENT_REQUESTS = "fs.azure.write.max.concurrent.requests";
|
||||||
public static final String AZURE_WRITE_MAX_REQUESTS_TO_QUEUE = "fs.azure.write.max.requests.to.queue";
|
public static final String AZURE_WRITE_MAX_REQUESTS_TO_QUEUE = "fs.azure.write.max.requests.to.queue";
|
||||||
public static final String AZURE_WRITE_BUFFER_SIZE = "fs.azure.write.request.size";
|
public static final String AZURE_WRITE_BUFFER_SIZE = "fs.azure.write.request.size";
|
||||||
|
/** If the data size written by Hadoop app is small, i.e. data size :
|
||||||
|
* (a) before any of HFlush/HSync call is made or
|
||||||
|
* (b) between 2 HFlush/Hsync API calls
|
||||||
|
* is less than write buffer size, 2 separate calls, one for append and
|
||||||
|
* another for flush are made.
|
||||||
|
* By enabling the small write optimization, a single call will be made to
|
||||||
|
* perform both append and flush operations and hence reduce request count.
|
||||||
|
*/
|
||||||
|
public static final String AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION = "fs.azure.write.enableappendwithflush";
|
||||||
public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size";
|
public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size";
|
||||||
public static final String AZURE_READ_SMALL_FILES_COMPLETELY = "fs.azure.read.smallfilescompletely";
|
public static final String AZURE_READ_SMALL_FILES_COMPLETELY = "fs.azure.read.smallfilescompletely";
|
||||||
public static final String AZURE_READ_OPTIMIZE_FOOTER_READ = "fs.azure.read.optimizefooterread";
|
public static final String AZURE_READ_OPTIMIZE_FOOTER_READ = "fs.azure.read.optimizefooterread";
|
||||||
|
|
|
@ -56,6 +56,7 @@ public final class FileSystemConfigurations {
|
||||||
// Default upload and download buffer size
|
// Default upload and download buffer size
|
||||||
public static final int DEFAULT_WRITE_BUFFER_SIZE = 8 * ONE_MB; // 8 MB
|
public static final int DEFAULT_WRITE_BUFFER_SIZE = 8 * ONE_MB; // 8 MB
|
||||||
public static final int APPENDBLOB_MAX_WRITE_BUFFER_SIZE = 4 * ONE_MB; // 4 MB
|
public static final int APPENDBLOB_MAX_WRITE_BUFFER_SIZE = 4 * ONE_MB; // 4 MB
|
||||||
|
public static final boolean DEFAULT_AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION = false;
|
||||||
public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB; // 4 MB
|
public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB; // 4 MB
|
||||||
public static final boolean DEFAULT_READ_SMALL_FILES_COMPLETELY = false;
|
public static final boolean DEFAULT_READ_SMALL_FILES_COMPLETELY = false;
|
||||||
public static final boolean DEFAULT_OPTIMIZE_FOOTER_READ = false;
|
public static final boolean DEFAULT_OPTIMIZE_FOOTER_READ = false;
|
||||||
|
|
|
@ -36,6 +36,7 @@ public final class HttpQueryParams {
|
||||||
public static final String QUERY_PARAM_POSITION = "position";
|
public static final String QUERY_PARAM_POSITION = "position";
|
||||||
public static final String QUERY_PARAM_TIMEOUT = "timeout";
|
public static final String QUERY_PARAM_TIMEOUT = "timeout";
|
||||||
public static final String QUERY_PARAM_RETAIN_UNCOMMITTED_DATA = "retainUncommittedData";
|
public static final String QUERY_PARAM_RETAIN_UNCOMMITTED_DATA = "retainUncommittedData";
|
||||||
|
public static final String QUERY_PARAM_FLUSH = "flush";
|
||||||
public static final String QUERY_PARAM_CLOSE = "close";
|
public static final String QUERY_PARAM_CLOSE = "close";
|
||||||
public static final String QUERY_PARAM_UPN = "upn";
|
public static final String QUERY_PARAM_UPN = "upn";
|
||||||
public static final String QUERY_PARAM_BLOBTYPE = "blobtype";
|
public static final String QUERY_PARAM_BLOBTYPE = "blobtype";
|
||||||
|
|
|
@ -0,0 +1,69 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.azurebfs.contracts.services;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Saves the different request parameters for append
|
||||||
|
*/
|
||||||
|
public class AppendRequestParameters {
|
||||||
|
public enum Mode {
|
||||||
|
APPEND_MODE,
|
||||||
|
FLUSH_MODE,
|
||||||
|
FLUSH_CLOSE_MODE
|
||||||
|
}
|
||||||
|
|
||||||
|
private final long position;
|
||||||
|
private final int offset;
|
||||||
|
private final int length;
|
||||||
|
private final Mode mode;
|
||||||
|
private final boolean isAppendBlob;
|
||||||
|
|
||||||
|
public AppendRequestParameters(final long position,
|
||||||
|
final int offset,
|
||||||
|
final int length,
|
||||||
|
final Mode mode,
|
||||||
|
final boolean isAppendBlob) {
|
||||||
|
this.position = position;
|
||||||
|
this.offset = offset;
|
||||||
|
this.length = length;
|
||||||
|
this.mode = mode;
|
||||||
|
this.isAppendBlob = isAppendBlob;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getPosition() {
|
||||||
|
return this.position;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getoffset() {
|
||||||
|
return this.offset;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getLength() {
|
||||||
|
return this.length;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Mode getMode() {
|
||||||
|
return this.mode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isAppendBlob() {
|
||||||
|
return this.isAppendBlob;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderExcept
|
||||||
import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper;
|
import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper;
|
||||||
import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
|
import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
|
||||||
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
|
||||||
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
|
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
|
||||||
import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
|
import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
@ -396,17 +397,27 @@ public class AbfsClient implements Closeable {
|
||||||
return op;
|
return op;
|
||||||
}
|
}
|
||||||
|
|
||||||
public AbfsRestOperation append(final String path, final long position, final byte[] buffer, final int offset,
|
public AbfsRestOperation append(final String path, final byte[] buffer,
|
||||||
final int length, final String cachedSasToken, final boolean isAppendBlob) throws AzureBlobFileSystemException {
|
AppendRequestParameters reqParams, final String cachedSasToken)
|
||||||
|
throws AzureBlobFileSystemException {
|
||||||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
|
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
|
||||||
// JDK7 does not support PATCH, so to workaround the issue we will use
|
// JDK7 does not support PATCH, so to workaround the issue we will use
|
||||||
// PUT and specify the real method in the X-Http-Method-Override header.
|
// PUT and specify the real method in the X-Http-Method-Override header.
|
||||||
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
|
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
|
||||||
HTTP_METHOD_PATCH));
|
HTTP_METHOD_PATCH));
|
||||||
|
|
||||||
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
|
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
|
||||||
abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
|
abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
|
||||||
abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position));
|
abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(reqParams.getPosition()));
|
||||||
|
|
||||||
|
if ((reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_MODE) || (
|
||||||
|
reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_CLOSE_MODE)) {
|
||||||
|
abfsUriQueryBuilder.addQuery(QUERY_PARAM_FLUSH, TRUE);
|
||||||
|
if (reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_CLOSE_MODE) {
|
||||||
|
abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, TRUE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
|
// AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
|
||||||
String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION,
|
String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION,
|
||||||
abfsUriQueryBuilder, cachedSasToken);
|
abfsUriQueryBuilder, cachedSasToken);
|
||||||
|
@ -414,20 +425,30 @@ public class AbfsClient implements Closeable {
|
||||||
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
|
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
|
||||||
final AbfsRestOperation op = new AbfsRestOperation(
|
final AbfsRestOperation op = new AbfsRestOperation(
|
||||||
AbfsRestOperationType.Append,
|
AbfsRestOperationType.Append,
|
||||||
this,
|
this,
|
||||||
HTTP_METHOD_PUT,
|
HTTP_METHOD_PUT,
|
||||||
url,
|
url,
|
||||||
requestHeaders, buffer, offset, length, sasTokenForReuse);
|
requestHeaders,
|
||||||
|
buffer,
|
||||||
|
reqParams.getoffset(),
|
||||||
|
reqParams.getLength(),
|
||||||
|
sasTokenForReuse);
|
||||||
try {
|
try {
|
||||||
op.execute();
|
op.execute();
|
||||||
} catch (AzureBlobFileSystemException e) {
|
} catch (AzureBlobFileSystemException e) {
|
||||||
if (isAppendBlob && appendSuccessCheckOp(op, path, (position + length))) {
|
if (reqParams.isAppendBlob()
|
||||||
|
&& appendSuccessCheckOp(op, path,
|
||||||
|
(reqParams.getPosition() + reqParams.getLength()))) {
|
||||||
final AbfsRestOperation successOp = new AbfsRestOperation(
|
final AbfsRestOperation successOp = new AbfsRestOperation(
|
||||||
AbfsRestOperationType.Append,
|
AbfsRestOperationType.Append,
|
||||||
this,
|
this,
|
||||||
HTTP_METHOD_PUT,
|
HTTP_METHOD_PUT,
|
||||||
url,
|
url,
|
||||||
requestHeaders, buffer, offset, length, sasTokenForReuse);
|
requestHeaders,
|
||||||
|
buffer,
|
||||||
|
reqParams.getoffset(),
|
||||||
|
reqParams.getLength(),
|
||||||
|
sasTokenForReuse);
|
||||||
successOp.hardSetResult(HttpURLConnection.HTTP_OK);
|
successOp.hardSetResult(HttpURLConnection.HTTP_OK);
|
||||||
return successOp;
|
return successOp;
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,11 +35,13 @@ import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
|
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
|
||||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
|
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
|
||||||
import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
|
import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
|
||||||
import org.apache.hadoop.io.ElasticByteBufferPool;
|
import org.apache.hadoop.io.ElasticByteBufferPool;
|
||||||
import org.apache.hadoop.fs.FileSystem.Statistics;
|
import org.apache.hadoop.fs.FileSystem.Statistics;
|
||||||
|
@ -48,6 +50,9 @@ import org.apache.hadoop.fs.StreamCapabilities;
|
||||||
import org.apache.hadoop.fs.Syncable;
|
import org.apache.hadoop.fs.Syncable;
|
||||||
|
|
||||||
import static org.apache.hadoop.io.IOUtils.wrapException;
|
import static org.apache.hadoop.io.IOUtils.wrapException;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.FLUSH_CLOSE_MODE;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.FLUSH_MODE;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The BlobFsOutputStream for Rest AbfsClient.
|
* The BlobFsOutputStream for Rest AbfsClient.
|
||||||
|
@ -60,6 +65,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
||||||
private boolean closed;
|
private boolean closed;
|
||||||
private boolean supportFlush;
|
private boolean supportFlush;
|
||||||
private boolean disableOutputStreamFlush;
|
private boolean disableOutputStreamFlush;
|
||||||
|
private boolean enableSmallWriteOptimization;
|
||||||
private boolean isAppendBlob;
|
private boolean isAppendBlob;
|
||||||
private volatile IOException lastError;
|
private volatile IOException lastError;
|
||||||
|
|
||||||
|
@ -69,6 +75,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
||||||
private final int bufferSize;
|
private final int bufferSize;
|
||||||
private byte[] buffer;
|
private byte[] buffer;
|
||||||
private int bufferIndex;
|
private int bufferIndex;
|
||||||
|
private int numOfAppendsToServerSinceLastFlush;
|
||||||
private final int maxConcurrentRequestCount;
|
private final int maxConcurrentRequestCount;
|
||||||
private final int maxRequestsThatCanBeQueued;
|
private final int maxRequestsThatCanBeQueued;
|
||||||
|
|
||||||
|
@ -108,12 +115,15 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
||||||
this.supportFlush = abfsOutputStreamContext.isEnableFlush();
|
this.supportFlush = abfsOutputStreamContext.isEnableFlush();
|
||||||
this.disableOutputStreamFlush = abfsOutputStreamContext
|
this.disableOutputStreamFlush = abfsOutputStreamContext
|
||||||
.isDisableOutputStreamFlush();
|
.isDisableOutputStreamFlush();
|
||||||
|
this.enableSmallWriteOptimization
|
||||||
|
= abfsOutputStreamContext.isEnableSmallWriteOptimization();
|
||||||
this.isAppendBlob = abfsOutputStreamContext.isAppendBlob();
|
this.isAppendBlob = abfsOutputStreamContext.isAppendBlob();
|
||||||
this.lastError = null;
|
this.lastError = null;
|
||||||
this.lastFlushOffset = 0;
|
this.lastFlushOffset = 0;
|
||||||
this.bufferSize = abfsOutputStreamContext.getWriteBufferSize();
|
this.bufferSize = abfsOutputStreamContext.getWriteBufferSize();
|
||||||
this.buffer = byteBufferPool.getBuffer(false, bufferSize).array();
|
this.buffer = byteBufferPool.getBuffer(false, bufferSize).array();
|
||||||
this.bufferIndex = 0;
|
this.bufferIndex = 0;
|
||||||
|
this.numOfAppendsToServerSinceLastFlush = 0;
|
||||||
this.writeOperations = new ConcurrentLinkedDeque<>();
|
this.writeOperations = new ConcurrentLinkedDeque<>();
|
||||||
this.outputStreamStatistics = abfsOutputStreamContext.getStreamStatistics();
|
this.outputStreamStatistics = abfsOutputStreamContext.getStreamStatistics();
|
||||||
|
|
||||||
|
@ -309,8 +319,29 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
||||||
|
|
||||||
private synchronized void flushInternal(boolean isClose) throws IOException {
|
private synchronized void flushInternal(boolean isClose) throws IOException {
|
||||||
maybeThrowLastError();
|
maybeThrowLastError();
|
||||||
|
|
||||||
|
// if its a flush post write < buffersize, send flush parameter in append
|
||||||
|
if (!isAppendBlob
|
||||||
|
&& enableSmallWriteOptimization
|
||||||
|
&& (numOfAppendsToServerSinceLastFlush == 0) // there are no ongoing store writes
|
||||||
|
&& (writeOperations.size() == 0) // double checking no appends in progress
|
||||||
|
&& (bufferIndex > 0)) { // there is some data that is pending to be written
|
||||||
|
smallWriteOptimizedflushInternal(isClose);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
writeCurrentBufferToService();
|
writeCurrentBufferToService();
|
||||||
flushWrittenBytesToService(isClose);
|
flushWrittenBytesToService(isClose);
|
||||||
|
numOfAppendsToServerSinceLastFlush = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized void smallWriteOptimizedflushInternal(boolean isClose) throws IOException {
|
||||||
|
// writeCurrentBufferToService will increment numOfAppendsToServerSinceLastFlush
|
||||||
|
writeCurrentBufferToService(true, isClose);
|
||||||
|
waitForAppendsToComplete();
|
||||||
|
shrinkWriteOperationQueue();
|
||||||
|
maybeThrowLastError();
|
||||||
|
numOfAppendsToServerSinceLastFlush = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void flushInternalAsync() throws IOException {
|
private synchronized void flushInternalAsync() throws IOException {
|
||||||
|
@ -335,8 +366,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
||||||
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
|
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
|
||||||
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
|
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
|
||||||
"writeCurrentBufferToService", "append")) {
|
"writeCurrentBufferToService", "append")) {
|
||||||
AbfsRestOperation op = client.append(path, offset, bytes, 0,
|
AppendRequestParameters reqParams = new AppendRequestParameters(offset, 0,
|
||||||
bytesLength, cachedSasToken.get(), this.isAppendBlob);
|
bytesLength, APPEND_MODE, true);
|
||||||
|
AbfsRestOperation op = client.append(path, bytes, reqParams, cachedSasToken.get());
|
||||||
cachedSasToken.update(op.getSasToken());
|
cachedSasToken.update(op.getSasToken());
|
||||||
outputStreamStatistics.uploadSuccessful(bytesLength);
|
outputStreamStatistics.uploadSuccessful(bytesLength);
|
||||||
perfInfo.registerResult(op.getResult());
|
perfInfo.registerResult(op.getResult());
|
||||||
|
@ -358,6 +390,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void writeCurrentBufferToService() throws IOException {
|
private synchronized void writeCurrentBufferToService() throws IOException {
|
||||||
|
writeCurrentBufferToService(false, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized void writeCurrentBufferToService(boolean isFlush, boolean isClose) throws IOException {
|
||||||
if (this.isAppendBlob) {
|
if (this.isAppendBlob) {
|
||||||
writeAppendBlobCurrentBufferToService();
|
writeAppendBlobCurrentBufferToService();
|
||||||
return;
|
return;
|
||||||
|
@ -367,6 +403,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
outputStreamStatistics.writeCurrentBuffer();
|
outputStreamStatistics.writeCurrentBuffer();
|
||||||
|
numOfAppendsToServerSinceLastFlush++;
|
||||||
|
|
||||||
final byte[] bytes = buffer;
|
final byte[] bytes = buffer;
|
||||||
final int bytesLength = bufferIndex;
|
final int bytesLength = bufferIndex;
|
||||||
|
@ -388,8 +425,19 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
||||||
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
|
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
|
||||||
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
|
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
|
||||||
"writeCurrentBufferToService", "append")) {
|
"writeCurrentBufferToService", "append")) {
|
||||||
AbfsRestOperation op = client.append(path, offset, bytes, 0,
|
AppendRequestParameters.Mode
|
||||||
bytesLength, cachedSasToken.get(), false);
|
mode = APPEND_MODE;
|
||||||
|
if (isFlush & isClose) {
|
||||||
|
mode = FLUSH_CLOSE_MODE;
|
||||||
|
} else if (isFlush) {
|
||||||
|
mode = FLUSH_MODE;
|
||||||
|
}
|
||||||
|
|
||||||
|
AppendRequestParameters reqParams = new AppendRequestParameters(
|
||||||
|
offset, 0, bytesLength, mode, false);
|
||||||
|
AbfsRestOperation op = client.append(path, bytes, reqParams,
|
||||||
|
cachedSasToken.get());
|
||||||
|
|
||||||
cachedSasToken.update(op.getSasToken());
|
cachedSasToken.update(op.getSasToken());
|
||||||
perfInfo.registerResult(op.getResult());
|
perfInfo.registerResult(op.getResult());
|
||||||
byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
|
byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
|
||||||
|
@ -410,7 +458,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
||||||
shrinkWriteOperationQueue();
|
shrinkWriteOperationQueue();
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void flushWrittenBytesToService(boolean isClose) throws IOException {
|
private synchronized void waitForAppendsToComplete() throws IOException {
|
||||||
for (WriteOperation writeOperation : writeOperations) {
|
for (WriteOperation writeOperation : writeOperations) {
|
||||||
try {
|
try {
|
||||||
writeOperation.task.get();
|
writeOperation.task.get();
|
||||||
|
@ -428,6 +476,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
||||||
throw lastError;
|
throw lastError;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized void flushWrittenBytesToService(boolean isClose) throws IOException {
|
||||||
|
waitForAppendsToComplete();
|
||||||
flushWrittenBytesToServiceInternal(position, false, isClose);
|
flushWrittenBytesToServiceInternal(position, false, isClose);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -558,6 +610,11 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
||||||
return maxRequestsThatCanBeQueued;
|
return maxRequestsThatCanBeQueued;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
Boolean isAppendBlobStream() {
|
||||||
|
return isAppendBlob;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Appending AbfsOutputStream statistics to base toString().
|
* Appending AbfsOutputStream statistics to base toString().
|
||||||
*
|
*
|
||||||
|
|
|
@ -27,6 +27,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
|
||||||
|
|
||||||
private boolean enableFlush;
|
private boolean enableFlush;
|
||||||
|
|
||||||
|
private boolean enableSmallWriteOptimization;
|
||||||
|
|
||||||
private boolean disableOutputStreamFlush;
|
private boolean disableOutputStreamFlush;
|
||||||
|
|
||||||
private AbfsOutputStreamStatistics streamStatistics;
|
private AbfsOutputStreamStatistics streamStatistics;
|
||||||
|
@ -52,6 +54,11 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public AbfsOutputStreamContext enableSmallWriteOptimization(final boolean enableSmallWriteOptimization) {
|
||||||
|
this.enableSmallWriteOptimization = enableSmallWriteOptimization;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public AbfsOutputStreamContext disableOutputStreamFlush(
|
public AbfsOutputStreamContext disableOutputStreamFlush(
|
||||||
final boolean disableOutputStreamFlush) {
|
final boolean disableOutputStreamFlush) {
|
||||||
this.disableOutputStreamFlush = disableOutputStreamFlush;
|
this.disableOutputStreamFlush = disableOutputStreamFlush;
|
||||||
|
@ -114,4 +121,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
|
||||||
public int getMaxWriteRequestsToQueue() {
|
public int getMaxWriteRequestsToQueue() {
|
||||||
return this.maxWriteRequestsToQueue;
|
return this.maxWriteRequestsToQueue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isEnableSmallWriteOptimization() {
|
||||||
|
return this.enableSmallWriteOptimization;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -290,7 +290,7 @@ public class AbfsRestOperation {
|
||||||
AbfsClientThrottlingIntercept.updateMetrics(operationType, httpOperation);
|
AbfsClientThrottlingIntercept.updateMetrics(operationType, httpOperation);
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.debug("HttpRequest: {}", httpOperation.toString());
|
LOG.debug("HttpRequest: {}: {}", operationType, httpOperation.toString());
|
||||||
|
|
||||||
if (client.getRetryPolicy().shouldRetry(retryCount, httpOperation.getStatusCode())) {
|
if (client.getRetryPolicy().shouldRetry(retryCount, httpOperation.getStatusCode())) {
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -33,14 +33,16 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
|
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
|
||||||
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
|
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.BYTES_RECEIVED;
|
||||||
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
|
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.GET_RESPONSES;
|
||||||
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.SEND_REQUESTS;
|
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.SEND_REQUESTS;
|
||||||
|
|
||||||
public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
|
public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
|
||||||
|
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(ITestAbfsNetworkStatistics.class);
|
LoggerFactory.getLogger(ITestAbfsNetworkStatistics.class);
|
||||||
private static final int LARGE_OPERATIONS = 10;
|
private static final int WRITE_OPERATION_LOOP_COUNT = 10;
|
||||||
|
|
||||||
public ITestAbfsNetworkStatistics() throws Exception {
|
public ITestAbfsNetworkStatistics() throws Exception {
|
||||||
}
|
}
|
||||||
|
@ -58,117 +60,126 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
|
||||||
Map<String, Long> metricMap;
|
Map<String, Long> metricMap;
|
||||||
Path sendRequestPath = path(getMethodName());
|
Path sendRequestPath = path(getMethodName());
|
||||||
String testNetworkStatsString = "http_send";
|
String testNetworkStatsString = "http_send";
|
||||||
long connectionsMade, requestsSent, bytesSent;
|
|
||||||
|
|
||||||
metricMap = fs.getInstrumentationMap();
|
metricMap = fs.getInstrumentationMap();
|
||||||
long connectionsMadeBeforeTest = metricMap
|
long expectedConnectionsMade = metricMap.get(CONNECTIONS_MADE.getStatName());
|
||||||
.get(CONNECTIONS_MADE.getStatName());
|
long expectedRequestsSent = metricMap.get(SEND_REQUESTS.getStatName());
|
||||||
long requestsMadeBeforeTest = metricMap.get(SEND_REQUESTS.getStatName());
|
long expectedBytesSent = 0;
|
||||||
|
|
||||||
/*
|
// --------------------------------------------------------------------
|
||||||
* Creating AbfsOutputStream will result in 1 connection made and 1 send
|
// Operation: Creating AbfsOutputStream
|
||||||
* request.
|
|
||||||
*/
|
|
||||||
try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
|
try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
|
||||||
sendRequestPath)) {
|
sendRequestPath)) {
|
||||||
|
// Network stats calculation: For Creating AbfsOutputStream:
|
||||||
|
// 1 create request = 1 connection made and 1 send request
|
||||||
|
expectedConnectionsMade++;
|
||||||
|
expectedRequestsSent++;
|
||||||
|
// --------------------------------------------------------------------
|
||||||
|
|
||||||
|
// Operation: Write small data
|
||||||
|
// Network stats calculation: No additions.
|
||||||
|
// Data written is less than the buffer size and hence will not
|
||||||
|
// trigger any append request to store
|
||||||
out.write(testNetworkStatsString.getBytes());
|
out.write(testNetworkStatsString.getBytes());
|
||||||
|
// --------------------------------------------------------------------
|
||||||
|
|
||||||
/*
|
// Operation: HFlush
|
||||||
* Flushes all outstanding data (i.e. the current unfinished packet)
|
// Flushes all outstanding data (i.e. the current unfinished packet)
|
||||||
* from the client into the service on all DataNode replicas.
|
// from the client into the service on all DataNode replicas.
|
||||||
*/
|
|
||||||
out.hflush();
|
out.hflush();
|
||||||
|
|
||||||
metricMap = fs.getInstrumentationMap();
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Testing the network stats with 1 write operation.
|
* Network stats calculation:
|
||||||
|
* 3 possibilities here:
|
||||||
|
* A. As there is pending data to be written to store, this will result in:
|
||||||
|
* 1 append + 1 flush = 2 connections and 2 send requests
|
||||||
*
|
*
|
||||||
* connections_made : (connections made above) + 2(flush).
|
* B. If config "fs.azure.enable.small.write.optimization" is enabled, append
|
||||||
|
* and flush call will be merged for small data in buffer in this test.
|
||||||
|
* In which case it will be:
|
||||||
|
* 1 append+flush request = 1 connection and 1 send request
|
||||||
*
|
*
|
||||||
* send_requests : (requests sent above) + 2(flush).
|
* C. If the path is configured for append Blob files to be used, hflush
|
||||||
*
|
* is a no-op. So in this case:
|
||||||
* bytes_sent : bytes wrote in AbfsOutputStream.
|
* 1 append = 1 connection and 1 send request
|
||||||
*/
|
*/
|
||||||
long extraCalls = 0;
|
if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())
|
||||||
if (!fs.getAbfsStore()
|
|| (this.getConfiguration().isSmallWriteOptimizationEnabled())) {
|
||||||
.isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())) {
|
expectedConnectionsMade++;
|
||||||
// no network calls are made for hflush in case of appendblob
|
expectedRequestsSent++;
|
||||||
extraCalls++;
|
} else {
|
||||||
|
expectedConnectionsMade += 2;
|
||||||
|
expectedRequestsSent += 2;
|
||||||
}
|
}
|
||||||
long expectedConnectionsMade = connectionsMadeBeforeTest + extraCalls + 2;
|
expectedBytesSent += testNetworkStatsString.getBytes().length;
|
||||||
long expectedRequestsSent = requestsMadeBeforeTest + extraCalls + 2;
|
// --------------------------------------------------------------------
|
||||||
connectionsMade = assertAbfsStatistics(CONNECTIONS_MADE,
|
|
||||||
|
// Assertions
|
||||||
|
metricMap = fs.getInstrumentationMap();
|
||||||
|
assertAbfsStatistics(CONNECTIONS_MADE,
|
||||||
expectedConnectionsMade, metricMap);
|
expectedConnectionsMade, metricMap);
|
||||||
requestsSent = assertAbfsStatistics(SEND_REQUESTS, expectedRequestsSent,
|
assertAbfsStatistics(SEND_REQUESTS, expectedRequestsSent,
|
||||||
metricMap);
|
metricMap);
|
||||||
bytesSent = assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
|
assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
|
||||||
testNetworkStatsString.getBytes().length, metricMap);
|
expectedBytesSent, metricMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
// To close the AbfsOutputStream 1 connection is made and 1 request is sent.
|
// --------------------------------------------------------------------
|
||||||
connectionsMade++;
|
// Operation: AbfsOutputStream close.
|
||||||
requestsSent++;
|
// Network Stats calculation: 1 flush (with close) is send.
|
||||||
|
// 1 flush request = 1 connection and 1 send request
|
||||||
|
expectedConnectionsMade++;
|
||||||
|
expectedRequestsSent++;
|
||||||
|
// --------------------------------------------------------------------
|
||||||
|
|
||||||
|
// Operation: Re-create the file / create overwrite scenario
|
||||||
try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
|
try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
|
||||||
sendRequestPath)) {
|
sendRequestPath)) {
|
||||||
|
|
||||||
// Is a file overwrite case
|
|
||||||
long createRequestCalls = 1;
|
|
||||||
long createTriggeredGFSForETag = 0;
|
|
||||||
if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) {
|
|
||||||
createRequestCalls += 1;
|
|
||||||
createTriggeredGFSForETag = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int i = 0; i < LARGE_OPERATIONS; i++) {
|
|
||||||
out.write(testNetworkStatsString.getBytes());
|
|
||||||
|
|
||||||
/*
|
|
||||||
* 1 flush call would create 2 connections and 2 send requests.
|
|
||||||
* when hflush() is called it will essentially trigger append() and
|
|
||||||
* flush() inside AbfsRestOperation. Both of which calls
|
|
||||||
* executeHttpOperation() method which creates a connection and sends
|
|
||||||
* requests.
|
|
||||||
*/
|
|
||||||
out.hflush();
|
|
||||||
}
|
|
||||||
|
|
||||||
metricMap = fs.getInstrumentationMap();
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Testing the network stats with Large amount of bytes sent.
|
* Network Stats calculation: create overwrite
|
||||||
*
|
* There are 2 possibilities here.
|
||||||
* connections made : connections_made(Last assertion) + 1
|
* A. create overwrite results in 1 server call
|
||||||
* (AbfsOutputStream) + LARGE_OPERATIONS * 2(flush).
|
* create with overwrite=true = 1 connection and 1 send request
|
||||||
*
|
|
||||||
* send requests : requests_sent(Last assertion) + 1(AbfsOutputStream) +
|
|
||||||
* LARGE_OPERATIONS * 2(flush).
|
|
||||||
*
|
|
||||||
* bytes sent : bytes_sent(Last assertion) + LARGE_OPERATIONS * (bytes
|
|
||||||
* wrote each time).
|
|
||||||
*
|
*
|
||||||
|
* B. If config "fs.azure.enable.conditional.create.overwrite" is enabled,
|
||||||
|
* create overwrite=false (will fail in this case as file is indeed present)
|
||||||
|
* + getFileStatus to fetch the file ETag
|
||||||
|
* + create overwrite=true
|
||||||
|
* = 3 connections and 2 send requests
|
||||||
*/
|
*/
|
||||||
|
if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) {
|
||||||
connectionsMade += createRequestCalls + createTriggeredGFSForETag;
|
expectedConnectionsMade += 3;
|
||||||
requestsSent += createRequestCalls;
|
expectedRequestsSent += 2;
|
||||||
if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())) {
|
|
||||||
// no network calls are made for hflush in case of appendblob
|
|
||||||
assertAbfsStatistics(CONNECTIONS_MADE,
|
|
||||||
connectionsMade + LARGE_OPERATIONS, metricMap);
|
|
||||||
assertAbfsStatistics(SEND_REQUESTS,
|
|
||||||
requestsSent + LARGE_OPERATIONS, metricMap);
|
|
||||||
} else {
|
} else {
|
||||||
assertAbfsStatistics(CONNECTIONS_MADE,
|
expectedConnectionsMade += 1;
|
||||||
connectionsMade + LARGE_OPERATIONS * 2, metricMap);
|
expectedRequestsSent += 1;
|
||||||
assertAbfsStatistics(SEND_REQUESTS,
|
|
||||||
requestsSent + LARGE_OPERATIONS * 2, metricMap);
|
|
||||||
}
|
}
|
||||||
assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
|
// --------------------------------------------------------------------
|
||||||
bytesSent + LARGE_OPERATIONS * (testNetworkStatsString.getBytes().length),
|
|
||||||
metricMap);
|
|
||||||
|
|
||||||
|
// Operation: Multiple small appends + hflush
|
||||||
|
for (int i = 0; i < WRITE_OPERATION_LOOP_COUNT; i++) {
|
||||||
|
out.write(testNetworkStatsString.getBytes());
|
||||||
|
// Network stats calculation: no-op. Small write
|
||||||
|
out.hflush();
|
||||||
|
// Network stats calculation: Hflush
|
||||||
|
// refer to previous comments for hFlush network stats calcualtion
|
||||||
|
// possibilities
|
||||||
|
if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())
|
||||||
|
|| (this.getConfiguration().isSmallWriteOptimizationEnabled())) {
|
||||||
|
expectedConnectionsMade++;
|
||||||
|
expectedRequestsSent++;
|
||||||
|
} else {
|
||||||
|
expectedConnectionsMade += 2;
|
||||||
|
expectedRequestsSent += 2;
|
||||||
|
}
|
||||||
|
expectedBytesSent += testNetworkStatsString.getBytes().length;
|
||||||
|
}
|
||||||
|
// --------------------------------------------------------------------
|
||||||
|
|
||||||
|
// Assertions
|
||||||
|
metricMap = fs.getInstrumentationMap();
|
||||||
|
assertAbfsStatistics(CONNECTIONS_MADE, expectedConnectionsMade, metricMap);
|
||||||
|
assertAbfsStatistics(SEND_REQUESTS, expectedRequestsSent, metricMap);
|
||||||
|
assertAbfsStatistics(AbfsStatistic.BYTES_SENT, expectedBytesSent, metricMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -185,130 +196,100 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
|
||||||
Path getResponsePath = path(getMethodName());
|
Path getResponsePath = path(getMethodName());
|
||||||
Map<String, Long> metricMap;
|
Map<String, Long> metricMap;
|
||||||
String testResponseString = "some response";
|
String testResponseString = "some response";
|
||||||
long getResponses, bytesReceived;
|
|
||||||
|
|
||||||
FSDataOutputStream out = null;
|
FSDataOutputStream out = null;
|
||||||
FSDataInputStream in = null;
|
FSDataInputStream in = null;
|
||||||
try {
|
long expectedConnectionsMade;
|
||||||
|
long expectedGetResponses;
|
||||||
|
long expectedBytesReceived;
|
||||||
|
|
||||||
/*
|
try {
|
||||||
* Creating a File and writing some bytes in it.
|
// Creating a File and writing some bytes in it.
|
||||||
*
|
|
||||||
* get_response : 3(getFileSystem) + 1(OutputStream creation) + 2
|
|
||||||
* (Writing data in Data store).
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
out = fs.create(getResponsePath);
|
out = fs.create(getResponsePath);
|
||||||
out.write(testResponseString.getBytes());
|
out.write(testResponseString.getBytes());
|
||||||
out.hflush();
|
out.hflush();
|
||||||
|
|
||||||
|
// Set metric baseline
|
||||||
metricMap = fs.getInstrumentationMap();
|
metricMap = fs.getInstrumentationMap();
|
||||||
long getResponsesBeforeTest = metricMap
|
long bytesWrittenToFile = testResponseString.getBytes().length;
|
||||||
.get(CONNECTIONS_MADE.getStatName());
|
expectedConnectionsMade = metricMap.get(CONNECTIONS_MADE.getStatName());
|
||||||
|
expectedGetResponses = metricMap.get(CONNECTIONS_MADE.getStatName());
|
||||||
|
expectedBytesReceived = metricMap.get(BYTES_RECEIVED.getStatName());
|
||||||
|
|
||||||
// open would require 1 get response.
|
// --------------------------------------------------------------------
|
||||||
|
// Operation: Create AbfsInputStream
|
||||||
in = fs.open(getResponsePath);
|
in = fs.open(getResponsePath);
|
||||||
// read would require 1 get response and also get the bytes received.
|
// Network stats calculation: For Creating AbfsInputStream:
|
||||||
|
// 1 GetFileStatus request to fetch file size = 1 connection and 1 get response
|
||||||
|
expectedConnectionsMade++;
|
||||||
|
expectedGetResponses++;
|
||||||
|
// --------------------------------------------------------------------
|
||||||
|
|
||||||
|
// Operation: Read
|
||||||
int result = in.read();
|
int result = in.read();
|
||||||
|
// Network stats calculation: For read:
|
||||||
|
// 1 read request = 1 connection and 1 get response
|
||||||
|
expectedConnectionsMade++;
|
||||||
|
expectedGetResponses++;
|
||||||
|
expectedBytesReceived += bytesWrittenToFile;
|
||||||
|
// --------------------------------------------------------------------
|
||||||
|
|
||||||
// Confirming read isn't -1.
|
// Assertions
|
||||||
LOG.info("Result of read operation : {}", result);
|
|
||||||
|
|
||||||
metricMap = fs.getInstrumentationMap();
|
metricMap = fs.getInstrumentationMap();
|
||||||
|
assertAbfsStatistics(CONNECTIONS_MADE, expectedConnectionsMade, metricMap);
|
||||||
/*
|
assertAbfsStatistics(GET_RESPONSES, expectedGetResponses, metricMap);
|
||||||
* Testing values of statistics after writing and reading a buffer.
|
assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED, expectedBytesReceived, metricMap);
|
||||||
*
|
|
||||||
* get_responses - (above operations) + 1(open()) + 1 (read()).;
|
|
||||||
*
|
|
||||||
* bytes_received - This should be equal to bytes sent earlier.
|
|
||||||
*/
|
|
||||||
long extraCalls = 0;
|
|
||||||
if (!fs.getAbfsStore()
|
|
||||||
.isAppendBlobKey(fs.makeQualified(getResponsePath).toString())) {
|
|
||||||
// no network calls are made for hflush in case of appendblob
|
|
||||||
extraCalls++;
|
|
||||||
}
|
|
||||||
long expectedGetResponses = getResponsesBeforeTest + extraCalls + 1;
|
|
||||||
getResponses = assertAbfsStatistics(AbfsStatistic.GET_RESPONSES,
|
|
||||||
expectedGetResponses, metricMap);
|
|
||||||
|
|
||||||
// Testing that bytes received is equal to bytes sent.
|
|
||||||
long bytesSend = metricMap.get(AbfsStatistic.BYTES_SENT.getStatName());
|
|
||||||
bytesReceived = assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED,
|
|
||||||
bytesSend,
|
|
||||||
metricMap);
|
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.cleanupWithLogger(LOG, out, in);
|
IOUtils.cleanupWithLogger(LOG, out, in);
|
||||||
}
|
}
|
||||||
|
|
||||||
// To close the streams 1 response is received.
|
// --------------------------------------------------------------------
|
||||||
getResponses++;
|
// Operation: AbfsOutputStream close.
|
||||||
|
// Network Stats calculation: no op.
|
||||||
|
// --------------------------------------------------------------------
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
||||||
/*
|
// Recreate file with different file size
|
||||||
* Creating a file and writing buffer into it.
|
// [Create and append related network stats checks are done in
|
||||||
* This is a file recreate, so it will trigger
|
// test method testAbfsHttpSendStatistics]
|
||||||
* 2 extra calls if create overwrite is off by default.
|
|
||||||
* Also recording the buffer for future read() call.
|
|
||||||
* This creating outputStream and writing requires 2 *
|
|
||||||
* (LARGE_OPERATIONS) get requests.
|
|
||||||
*/
|
|
||||||
StringBuilder largeBuffer = new StringBuilder();
|
StringBuilder largeBuffer = new StringBuilder();
|
||||||
out = fs.create(getResponsePath);
|
out = fs.create(getResponsePath);
|
||||||
|
|
||||||
long createRequestCalls = 1;
|
for (int i = 0; i < WRITE_OPERATION_LOOP_COUNT; i++) {
|
||||||
if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) {
|
|
||||||
createRequestCalls += 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int i = 0; i < LARGE_OPERATIONS; i++) {
|
|
||||||
out.write(testResponseString.getBytes());
|
out.write(testResponseString.getBytes());
|
||||||
out.hflush();
|
out.hflush();
|
||||||
largeBuffer.append(testResponseString);
|
largeBuffer.append(testResponseString);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open requires 1 get_response.
|
// sync back to metric baseline
|
||||||
in = fs.open(getResponsePath);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Reading the file which was written above. This read() call would
|
|
||||||
* read bytes equal to the bytes that was written above.
|
|
||||||
* Get response would be 1 only.
|
|
||||||
*/
|
|
||||||
in.read(0, largeBuffer.toString().getBytes(), 0,
|
|
||||||
largeBuffer.toString().getBytes().length);
|
|
||||||
|
|
||||||
metricMap = fs.getInstrumentationMap();
|
metricMap = fs.getInstrumentationMap();
|
||||||
|
expectedConnectionsMade = metricMap.get(CONNECTIONS_MADE.getStatName());
|
||||||
|
expectedGetResponses = metricMap.get(GET_RESPONSES.getStatName());
|
||||||
|
// --------------------------------------------------------------------
|
||||||
|
// Operation: Create AbfsInputStream
|
||||||
|
in = fs.open(getResponsePath);
|
||||||
|
// Network stats calculation: For Creating AbfsInputStream:
|
||||||
|
// 1 GetFileStatus for file size = 1 connection and 1 get response
|
||||||
|
expectedConnectionsMade++;
|
||||||
|
expectedGetResponses++;
|
||||||
|
// --------------------------------------------------------------------
|
||||||
|
|
||||||
/*
|
// Operation: Read
|
||||||
* Testing the statistics values after writing and reading a large buffer.
|
in.read(0, largeBuffer.toString().getBytes(), 0, largeBuffer.toString().getBytes().length);
|
||||||
*
|
// Network stats calculation: Total data written is still lesser than
|
||||||
* get_response : get_responses(Last assertion) + 1
|
// a buffer size. Hence will trigger only one read to store. So result is:
|
||||||
* (OutputStream) + 2 * LARGE_OPERATIONS(Writing and flushing
|
// 1 read request = 1 connection and 1 get response
|
||||||
* LARGE_OPERATIONS times) + 1(open()) + 1(read()) +
|
expectedConnectionsMade++;
|
||||||
* 1 (createOverwriteTriggeredGetForeTag).
|
expectedGetResponses++;
|
||||||
*
|
expectedBytesReceived += (WRITE_OPERATION_LOOP_COUNT * testResponseString.getBytes().length);
|
||||||
* bytes_received : bytes_received(Last assertion) + LARGE_OPERATIONS *
|
// --------------------------------------------------------------------
|
||||||
* bytes wrote each time (bytes_received is equal to bytes wrote in the
|
|
||||||
* File).
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED,
|
|
||||||
bytesReceived + LARGE_OPERATIONS * (testResponseString.getBytes().length),
|
|
||||||
metricMap);
|
|
||||||
if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(getResponsePath).toString())) {
|
|
||||||
// no network calls are made for hflush in case of appendblob
|
|
||||||
assertAbfsStatistics(AbfsStatistic.GET_RESPONSES,
|
|
||||||
getResponses + 3 + LARGE_OPERATIONS, metricMap);
|
|
||||||
} else {
|
|
||||||
assertAbfsStatistics(AbfsStatistic.GET_RESPONSES,
|
|
||||||
getResponses + 2 + createRequestCalls + 2 * LARGE_OPERATIONS,
|
|
||||||
metricMap);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// Assertions
|
||||||
|
metricMap = fs.getInstrumentationMap();
|
||||||
|
assertAbfsStatistics(CONNECTIONS_MADE, expectedConnectionsMade, metricMap);
|
||||||
|
assertAbfsStatistics(GET_RESPONSES, expectedGetResponses, metricMap);
|
||||||
|
assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED, expectedBytesReceived, metricMap);
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.cleanupWithLogger(LOG, out, in);
|
IOUtils.cleanupWithLogger(LOG, out, in);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,523 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.azurebfs;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.assertj.core.api.Assertions;
|
||||||
|
import org.junit.Assume;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.BYTES_SENT;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.SEND_REQUESTS;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_APPENDBLOB_ENABLED;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test combination for small writes with flush and close operations.
|
||||||
|
* This test class formulates an append test flow to assert on various scenarios.
|
||||||
|
* Test stages:
|
||||||
|
* 1. Pre-create test file of required size. This is determined by
|
||||||
|
* startingFileSize parameter. If it is 0, then pre-creation is skipped.
|
||||||
|
*
|
||||||
|
* 2. Formulate an append loop or iteration. An iteration, will do N writes
|
||||||
|
* (determined by numOfClientWrites parameter) with each writing X bytes
|
||||||
|
* (determined by recurringClientWriteSize parameter).
|
||||||
|
*
|
||||||
|
* 3. Determine total number of append iterations needed by a test.
|
||||||
|
* If intention is to close the outputStream right after append, setting
|
||||||
|
* directCloseTest parameter will determine 1 append test iteration with an
|
||||||
|
* ending close.
|
||||||
|
* Else, it will execute TEST_FLUSH_ITERATION number of test iterations, with
|
||||||
|
* each doing appends, hflush/hsync and then close.
|
||||||
|
*
|
||||||
|
* 4. Execute test iterations with asserts on number of store requests made and
|
||||||
|
* validating file content.
|
||||||
|
*/
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
|
public class ITestSmallWriteOptimization extends AbstractAbfsScaleTest {
|
||||||
|
private static final int ONE_MB = 1024 * 1024;
|
||||||
|
private static final int TWO_MB = 2 * ONE_MB;
|
||||||
|
private static final int TEST_BUFFER_SIZE = TWO_MB;
|
||||||
|
private static final int HALF_TEST_BUFFER_SIZE = TWO_MB / 2;
|
||||||
|
private static final int QUARTER_TEST_BUFFER_SIZE = TWO_MB / 4;
|
||||||
|
private static final int TEST_FLUSH_ITERATION = 2;
|
||||||
|
|
||||||
|
@Parameterized.Parameter
|
||||||
|
public String testScenario;
|
||||||
|
|
||||||
|
@Parameterized.Parameter(1)
|
||||||
|
public boolean enableSmallWriteOptimization;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If true, will initiate close after appends. (That is, no explicit hflush or
|
||||||
|
* hsync calls will be made from client app.)
|
||||||
|
*/
|
||||||
|
@Parameterized.Parameter(2)
|
||||||
|
public boolean directCloseTest;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If non-zero, test file should be created as pre-requisite with this size.
|
||||||
|
*/
|
||||||
|
@Parameterized.Parameter(3)
|
||||||
|
public Integer startingFileSize;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Determines the write sizes to be issued by client app.
|
||||||
|
*/
|
||||||
|
@Parameterized.Parameter(4)
|
||||||
|
public Integer recurringClientWriteSize;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Determines the number of Client writes to make.
|
||||||
|
*/
|
||||||
|
@Parameterized.Parameter(5)
|
||||||
|
public Integer numOfClientWrites;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* True, if the small write optimization is supposed to be effective in
|
||||||
|
* the scenario.
|
||||||
|
*/
|
||||||
|
@Parameterized.Parameter(6)
|
||||||
|
public boolean flushExpectedToBeMergedWithAppend;
|
||||||
|
|
||||||
|
@Parameterized.Parameters(name = "{0}")
|
||||||
|
public static Iterable<Object[]> params() {
|
||||||
|
return Arrays.asList(
|
||||||
|
// Parameter Order :
|
||||||
|
// testScenario,
|
||||||
|
// enableSmallWriteOptimization, directCloseTest, startingFileSize,
|
||||||
|
// recurringClientWriteSize, numOfClientWrites, flushExpectedToBeMergedWithAppend
|
||||||
|
new Object[][]{
|
||||||
|
// Buffer Size Write tests
|
||||||
|
{ "OptmON_FlushCloseTest_EmptyFile_BufferSizeWrite",
|
||||||
|
true, false, 0, TEST_BUFFER_SIZE, 1, false
|
||||||
|
},
|
||||||
|
{ "OptmON_FlushCloseTest_NonEmptyFile_BufferSizeWrite",
|
||||||
|
true, false, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 1, false
|
||||||
|
},
|
||||||
|
{ "OptmON_CloseTest_EmptyFile_BufferSizeWrite",
|
||||||
|
true, true, 0, TEST_BUFFER_SIZE, 1, false
|
||||||
|
},
|
||||||
|
{ "OptmON_CloseTest_NonEmptyFile_BufferSizeWrite",
|
||||||
|
true, true, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 1, false
|
||||||
|
},
|
||||||
|
{ "OptmOFF_FlushCloseTest_EmptyFile_BufferSizeWrite",
|
||||||
|
false, false, 0, TEST_BUFFER_SIZE, 1, false
|
||||||
|
},
|
||||||
|
{ "OptmOFF_FlushCloseTest_NonEmptyFile_BufferSizeWrite",
|
||||||
|
false, false, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 1, false
|
||||||
|
},
|
||||||
|
{ "OptmOFF_CloseTest_EmptyFile_BufferSizeWrite",
|
||||||
|
false, true, 0, TEST_BUFFER_SIZE, 1, false
|
||||||
|
},
|
||||||
|
{ "OptmOFF_CloseTest_NonEmptyFile_BufferSizeWrite",
|
||||||
|
false, true, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 1, false
|
||||||
|
},
|
||||||
|
// Less than buffer size write tests
|
||||||
|
{ "OptmON_FlushCloseTest_EmptyFile_LessThanBufferSizeWrite",
|
||||||
|
true, false, 0, Math.abs(HALF_TEST_BUFFER_SIZE), 1, true
|
||||||
|
},
|
||||||
|
{ "OptmON_FlushCloseTest_NonEmptyFile_LessThanBufferSizeWrite",
|
||||||
|
true, false, 2 * TEST_BUFFER_SIZE,
|
||||||
|
Math.abs(HALF_TEST_BUFFER_SIZE), 1, true
|
||||||
|
},
|
||||||
|
{ "OptmON_CloseTest_EmptyFile_LessThanBufferSizeWrite",
|
||||||
|
true, true, 0, Math.abs(HALF_TEST_BUFFER_SIZE), 1, true
|
||||||
|
},
|
||||||
|
{ "OptmON_CloseTest_NonEmptyFile_LessThanBufferSizeWrite",
|
||||||
|
true, true, 2 * TEST_BUFFER_SIZE,
|
||||||
|
Math.abs(HALF_TEST_BUFFER_SIZE), 1, true
|
||||||
|
},
|
||||||
|
{ "OptmOFF_FlushCloseTest_EmptyFile_LessThanBufferSizeWrite",
|
||||||
|
false, false, 0, Math.abs(HALF_TEST_BUFFER_SIZE), 1, false
|
||||||
|
},
|
||||||
|
{ "OptmOFF_FlushCloseTest_NonEmptyFile_LessThanBufferSizeWrite",
|
||||||
|
false, false, 2 * TEST_BUFFER_SIZE,
|
||||||
|
Math.abs(HALF_TEST_BUFFER_SIZE), 1, false
|
||||||
|
},
|
||||||
|
{ "OptmOFF_CloseTest_EmptyFile_LessThanBufferSizeWrite",
|
||||||
|
false, true, 0, Math.abs(HALF_TEST_BUFFER_SIZE), 1, false
|
||||||
|
},
|
||||||
|
{ "OptmOFF_CloseTest_NonEmptyFile_LessThanBufferSizeWrite",
|
||||||
|
false, true, 2 * TEST_BUFFER_SIZE,
|
||||||
|
Math.abs(HALF_TEST_BUFFER_SIZE), 1, false
|
||||||
|
},
|
||||||
|
// Multiple small writes still less than buffer size
|
||||||
|
{ "OptmON_FlushCloseTest_EmptyFile_MultiSmallWritesStillLessThanBufferSize",
|
||||||
|
true, false, 0, Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, true
|
||||||
|
},
|
||||||
|
{ "OptmON_FlushCloseTest_NonEmptyFile_MultiSmallWritesStillLessThanBufferSize",
|
||||||
|
true, false, 2 * TEST_BUFFER_SIZE,
|
||||||
|
Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, true
|
||||||
|
},
|
||||||
|
{ "OptmON_CloseTest_EmptyFile_MultiSmallWritesStillLessThanBufferSize",
|
||||||
|
true, true, 0, Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, true
|
||||||
|
},
|
||||||
|
{ "OptmON_CloseTest_NonEmptyFile_MultiSmallWritesStillLessThanBufferSize",
|
||||||
|
true, true, 2 * TEST_BUFFER_SIZE,
|
||||||
|
Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, true
|
||||||
|
},
|
||||||
|
{ "OptmOFF_FlushCloseTest_EmptyFile_MultiSmallWritesStillLessThanBufferSize",
|
||||||
|
false, false, 0, Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, false
|
||||||
|
},
|
||||||
|
{ "OptmOFF_FlushCloseTest_NonEmptyFile_MultiSmallWritesStillLessThanBufferSize",
|
||||||
|
false, false, 2 * TEST_BUFFER_SIZE,
|
||||||
|
Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, false
|
||||||
|
},
|
||||||
|
{ "OptmOFF_CloseTest_EmptyFile_MultiSmallWritesStillLessThanBufferSize",
|
||||||
|
false, true, 0, Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, false
|
||||||
|
},
|
||||||
|
{ "OptmOFF_CloseTest_NonEmptyFile_MultiSmallWritesStillLessThanBufferSize",
|
||||||
|
false, true, 2 * TEST_BUFFER_SIZE,
|
||||||
|
Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, false
|
||||||
|
},
|
||||||
|
// Multiple full buffer writes
|
||||||
|
{ "OptmON_FlushCloseTest_EmptyFile_MultiBufferSizeWrite",
|
||||||
|
true, false, 0, TEST_BUFFER_SIZE, 3, false
|
||||||
|
},
|
||||||
|
{ "OptmON_FlushCloseTest_NonEmptyFile_MultiBufferSizeWrite",
|
||||||
|
true, false, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 3, false
|
||||||
|
},
|
||||||
|
{ "OptmON_CloseTest_EmptyFile_MultiBufferSizeWrite",
|
||||||
|
true, true, 0, TEST_BUFFER_SIZE, 3, false
|
||||||
|
},
|
||||||
|
{ "OptmON_CloseTest_NonEmptyFile_MultiBufferSizeWrite",
|
||||||
|
true, true, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 3, false
|
||||||
|
},
|
||||||
|
{ "OptmOFF_FlushCloseTest_EmptyFile_MultiBufferSizeWrite",
|
||||||
|
false, false, 0, TEST_BUFFER_SIZE, 3, false
|
||||||
|
},
|
||||||
|
{ "OptmOFF_FlushCloseTest_NonEmptyFile_MultiBufferSizeWrite",
|
||||||
|
false, false, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 3, false
|
||||||
|
},
|
||||||
|
{ "OptmOFF_CloseTest_EmptyFile_MultiBufferSizeWrite",
|
||||||
|
false, true, 0, TEST_BUFFER_SIZE, 3, false
|
||||||
|
},
|
||||||
|
{ "OptmOFF_CloseTest_NonEmptyFile_MultiBufferSizeWrite",
|
||||||
|
false, true, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 3, false
|
||||||
|
},
|
||||||
|
// Multiple full buffers triggered and data less than buffer size pending
|
||||||
|
{ "OptmON_FlushCloseTest_EmptyFile_BufferAndExtraWrite",
|
||||||
|
true, false, 0,
|
||||||
|
TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE),
|
||||||
|
3, false
|
||||||
|
},
|
||||||
|
{ "OptmON_FlushCloseTest_NonEmptyFile_BufferAndExtraWrite",
|
||||||
|
true, false, 2 * TEST_BUFFER_SIZE,
|
||||||
|
TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE),
|
||||||
|
3, false
|
||||||
|
},
|
||||||
|
{ "OptmON_CloseTest_EmptyFile__BufferAndExtraWrite",
|
||||||
|
true, true, 0,
|
||||||
|
TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE),
|
||||||
|
3, false
|
||||||
|
},
|
||||||
|
{ "OptmON_CloseTest_NonEmptyFile_BufferAndExtraWrite",
|
||||||
|
true, true, 2 * TEST_BUFFER_SIZE,
|
||||||
|
TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE),
|
||||||
|
3, false
|
||||||
|
},
|
||||||
|
{ "OptmOFF_FlushCloseTest_EmptyFile_BufferAndExtraWrite",
|
||||||
|
false, false, 0,
|
||||||
|
TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE),
|
||||||
|
3, false
|
||||||
|
},
|
||||||
|
{ "OptmOFF_FlushCloseTest_NonEmptyFile_BufferAndExtraWrite",
|
||||||
|
false, false, 2 * TEST_BUFFER_SIZE,
|
||||||
|
TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE),
|
||||||
|
3, false
|
||||||
|
},
|
||||||
|
{ "OptmOFF_CloseTest_EmptyFile_BufferAndExtraWrite",
|
||||||
|
false, true, 0,
|
||||||
|
TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE),
|
||||||
|
3, false
|
||||||
|
},
|
||||||
|
{ "OptmOFF_CloseTest_NonEmptyFile_BufferAndExtraWrite",
|
||||||
|
false, true, 2 * TEST_BUFFER_SIZE,
|
||||||
|
TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE),
|
||||||
|
3, false
|
||||||
|
},
|
||||||
|
// 0 byte tests
|
||||||
|
{ "OptmON_FlushCloseTest_EmptyFile_0ByteWrite",
|
||||||
|
true, false, 0, 0, 1, false
|
||||||
|
},
|
||||||
|
{ "OptmON_FlushCloseTest_NonEmptyFile_0ByteWrite",
|
||||||
|
true, false, 2 * TEST_BUFFER_SIZE, 0, 1, false
|
||||||
|
},
|
||||||
|
{ "OptmON_CloseTest_EmptyFile_0ByteWrite",
|
||||||
|
true, true, 0, 0, 1, false
|
||||||
|
},
|
||||||
|
{ "OptmON_CloseTest_NonEmptyFile_0ByteWrite",
|
||||||
|
true, true, 2 * TEST_BUFFER_SIZE, 0, 1, false
|
||||||
|
},
|
||||||
|
{ "OptmOFF_FlushCloseTest_EmptyFile_0ByteWrite",
|
||||||
|
false, false, 0, 0, 1, false
|
||||||
|
},
|
||||||
|
{ "OptmOFF_FlushCloseTest_NonEmptyFile_0ByteWrite",
|
||||||
|
false, false, 2 * TEST_BUFFER_SIZE, 0, 1, false
|
||||||
|
},
|
||||||
|
{ "OptmOFF_CloseTest_EmptyFile_0ByteWrite",
|
||||||
|
false, true, 0, 0, 1, false
|
||||||
|
},
|
||||||
|
{ "OptmOFF_CloseTest_NonEmptyFile_0ByteWrite",
|
||||||
|
false, true, 2 * TEST_BUFFER_SIZE, 0, 1, false
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
public ITestSmallWriteOptimization() throws Exception {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSmallWriteOptimization()
|
||||||
|
throws IOException {
|
||||||
|
boolean serviceDefaultOptmSettings = DEFAULT_AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION;
|
||||||
|
// Tests with Optimization should only run if service has the feature on by
|
||||||
|
// default. Default settings will be turned on when server support is
|
||||||
|
// available on all store prod regions.
|
||||||
|
if (enableSmallWriteOptimization) {
|
||||||
|
Assume.assumeTrue(serviceDefaultOptmSettings);
|
||||||
|
}
|
||||||
|
|
||||||
|
final AzureBlobFileSystem currentfs = this.getFileSystem();
|
||||||
|
Configuration config = currentfs.getConf();
|
||||||
|
boolean isAppendBlobTestSettingEnabled = (config.get(FS_AZURE_TEST_APPENDBLOB_ENABLED) == "true");
|
||||||
|
|
||||||
|
// This optimization doesnt take effect when append blob is on.
|
||||||
|
Assume.assumeFalse(isAppendBlobTestSettingEnabled);
|
||||||
|
|
||||||
|
config.set(ConfigurationKeys.AZURE_WRITE_BUFFER_SIZE, Integer.toString(TEST_BUFFER_SIZE));
|
||||||
|
config.set(ConfigurationKeys.AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION, Boolean.toString(enableSmallWriteOptimization));
|
||||||
|
final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.get(
|
||||||
|
currentfs.getUri(), config);
|
||||||
|
|
||||||
|
formulateSmallWriteTestAppendPattern(fs, startingFileSize,
|
||||||
|
recurringClientWriteSize, numOfClientWrites,
|
||||||
|
directCloseTest, flushExpectedToBeMergedWithAppend);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* if isDirectCloseTest == true, append + close is triggered
|
||||||
|
* if isDirectCloseTest == false, append + flush runs are repeated over
|
||||||
|
* iterations followed by close
|
||||||
|
* @param fs
|
||||||
|
* @param startingFileSize
|
||||||
|
* @param recurringWriteSize
|
||||||
|
* @param numOfWrites
|
||||||
|
* @param isDirectCloseTest
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private void formulateSmallWriteTestAppendPattern(final AzureBlobFileSystem fs,
|
||||||
|
int startingFileSize,
|
||||||
|
int recurringWriteSize,
|
||||||
|
int numOfWrites,
|
||||||
|
boolean isDirectCloseTest,
|
||||||
|
boolean flushExpectedToBeMergedWithAppend) throws IOException {
|
||||||
|
|
||||||
|
int totalDataToBeAppended = 0;
|
||||||
|
int testIteration = 0;
|
||||||
|
int dataWrittenPerIteration = (numOfWrites * recurringWriteSize);
|
||||||
|
|
||||||
|
if (isDirectCloseTest) {
|
||||||
|
totalDataToBeAppended = dataWrittenPerIteration;
|
||||||
|
testIteration = 1;
|
||||||
|
} else {
|
||||||
|
testIteration = TEST_FLUSH_ITERATION;
|
||||||
|
totalDataToBeAppended = testIteration * dataWrittenPerIteration;
|
||||||
|
}
|
||||||
|
|
||||||
|
int totalFileSize = totalDataToBeAppended + startingFileSize;
|
||||||
|
// write buffer of file size created. This will be used as write
|
||||||
|
// source and for file content validation
|
||||||
|
final byte[] writeBuffer = new byte[totalFileSize];
|
||||||
|
new Random().nextBytes(writeBuffer);
|
||||||
|
int writeBufferCursor = 0;
|
||||||
|
|
||||||
|
Path testPath = new Path(getMethodName() + UUID.randomUUID().toString());
|
||||||
|
FSDataOutputStream opStream;
|
||||||
|
|
||||||
|
if (startingFileSize > 0) {
|
||||||
|
writeBufferCursor += createFileWithStartingTestSize(fs, writeBuffer, writeBufferCursor, testPath,
|
||||||
|
startingFileSize);
|
||||||
|
opStream = fs.append(testPath);
|
||||||
|
} else {
|
||||||
|
opStream = fs.create(testPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
final int writeBufferSize = fs.getAbfsStore()
|
||||||
|
.getAbfsConfiguration()
|
||||||
|
.getWriteBufferSize();
|
||||||
|
long expectedTotalRequestsMade = fs.getInstrumentationMap()
|
||||||
|
.get(CONNECTIONS_MADE.getStatName());
|
||||||
|
long expectedRequestsMadeWithData = fs.getInstrumentationMap()
|
||||||
|
.get(SEND_REQUESTS.getStatName());
|
||||||
|
long expectedBytesSent = fs.getInstrumentationMap()
|
||||||
|
.get(BYTES_SENT.getStatName());
|
||||||
|
|
||||||
|
while (testIteration > 0) {
|
||||||
|
// trigger recurringWriteSize appends over numOfWrites
|
||||||
|
writeBufferCursor += executeWritePattern(opStream, writeBuffer,
|
||||||
|
writeBufferCursor, numOfWrites, recurringWriteSize);
|
||||||
|
|
||||||
|
int numOfBuffersWrittenToStore = (int) Math.floor(
|
||||||
|
dataWrittenPerIteration / writeBufferSize);
|
||||||
|
int dataSizeWrittenToStore = numOfBuffersWrittenToStore * writeBufferSize;
|
||||||
|
int pendingDataToStore = dataWrittenPerIteration - dataSizeWrittenToStore;
|
||||||
|
|
||||||
|
expectedTotalRequestsMade += numOfBuffersWrittenToStore;
|
||||||
|
expectedRequestsMadeWithData += numOfBuffersWrittenToStore;
|
||||||
|
expectedBytesSent += dataSizeWrittenToStore;
|
||||||
|
|
||||||
|
if (isDirectCloseTest) {
|
||||||
|
opStream.close();
|
||||||
|
} else {
|
||||||
|
opStream.hflush();
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean wasDataPendingToBeWrittenToServer = (pendingDataToStore > 0);
|
||||||
|
// Small write optimization will only work if
|
||||||
|
// a. config for small write optimization is on
|
||||||
|
// b. no buffer writes have been triggered since last flush
|
||||||
|
// c. there is some pending data in buffer to write to store
|
||||||
|
final boolean smallWriteOptimizationEnabled = fs.getAbfsStore()
|
||||||
|
.getAbfsConfiguration()
|
||||||
|
.isSmallWriteOptimizationEnabled();
|
||||||
|
boolean flushWillBeMergedWithAppend = smallWriteOptimizationEnabled
|
||||||
|
&& (numOfBuffersWrittenToStore == 0)
|
||||||
|
&& (wasDataPendingToBeWrittenToServer);
|
||||||
|
|
||||||
|
Assertions.assertThat(flushWillBeMergedWithAppend)
|
||||||
|
.describedAs(flushExpectedToBeMergedWithAppend
|
||||||
|
? "Flush was to be merged with Append"
|
||||||
|
: "Flush should not have been merged with Append")
|
||||||
|
.isEqualTo(flushExpectedToBeMergedWithAppend);
|
||||||
|
|
||||||
|
int totalAppendFlushCalls = (flushWillBeMergedWithAppend
|
||||||
|
? 1 // 1 append (with flush and close param)
|
||||||
|
: (wasDataPendingToBeWrittenToServer)
|
||||||
|
? 2 // 1 append + 1 flush (with close)
|
||||||
|
: 1); // 1 flush (with close)
|
||||||
|
|
||||||
|
expectedTotalRequestsMade += totalAppendFlushCalls;
|
||||||
|
expectedRequestsMadeWithData += totalAppendFlushCalls;
|
||||||
|
expectedBytesSent += wasDataPendingToBeWrittenToServer
|
||||||
|
? pendingDataToStore
|
||||||
|
: 0;
|
||||||
|
|
||||||
|
assertOpStats(fs.getInstrumentationMap(), expectedTotalRequestsMade,
|
||||||
|
expectedRequestsMadeWithData, expectedBytesSent);
|
||||||
|
|
||||||
|
if (isDirectCloseTest) {
|
||||||
|
// stream already closed
|
||||||
|
validateStoreAppends(fs, testPath, totalFileSize, writeBuffer);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
testIteration--;
|
||||||
|
}
|
||||||
|
|
||||||
|
opStream.close();
|
||||||
|
expectedTotalRequestsMade += 1;
|
||||||
|
expectedRequestsMadeWithData += 1;
|
||||||
|
// no change in expectedBytesSent
|
||||||
|
assertOpStats(fs.getInstrumentationMap(), expectedTotalRequestsMade, expectedRequestsMadeWithData, expectedBytesSent);
|
||||||
|
|
||||||
|
validateStoreAppends(fs, testPath, totalFileSize, writeBuffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
private int createFileWithStartingTestSize(AzureBlobFileSystem fs, byte[] writeBuffer,
|
||||||
|
int writeBufferCursor, Path testPath, int startingFileSize)
|
||||||
|
throws IOException {
|
||||||
|
FSDataOutputStream opStream = fs.create(testPath);
|
||||||
|
writeBufferCursor += executeWritePattern(opStream,
|
||||||
|
writeBuffer,
|
||||||
|
writeBufferCursor,
|
||||||
|
1,
|
||||||
|
startingFileSize);
|
||||||
|
|
||||||
|
opStream.close();
|
||||||
|
Assertions.assertThat(fs.getFileStatus(testPath).getLen())
|
||||||
|
.describedAs("File should be of size %d at the start of test.",
|
||||||
|
startingFileSize)
|
||||||
|
.isEqualTo(startingFileSize);
|
||||||
|
|
||||||
|
return writeBufferCursor;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void validateStoreAppends(AzureBlobFileSystem fs,
|
||||||
|
Path testPath,
|
||||||
|
int totalFileSize,
|
||||||
|
byte[] bufferWritten)
|
||||||
|
throws IOException {
|
||||||
|
// Final validation
|
||||||
|
Assertions.assertThat(fs.getFileStatus(testPath).getLen())
|
||||||
|
.describedAs("File should be of size %d at the end of test.",
|
||||||
|
totalFileSize)
|
||||||
|
.isEqualTo(totalFileSize);
|
||||||
|
|
||||||
|
byte[] fileReadFromStore = new byte[totalFileSize];
|
||||||
|
fs.open(testPath).read(fileReadFromStore, 0, totalFileSize);
|
||||||
|
|
||||||
|
assertArrayEquals("Test file content incorrect", bufferWritten,
|
||||||
|
fileReadFromStore);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertOpStats(Map<String, Long> metricMap,
|
||||||
|
long expectedTotalRequestsMade,
|
||||||
|
long expectedRequestsMadeWithData,
|
||||||
|
long expectedBytesSent) {
|
||||||
|
assertAbfsStatistics(CONNECTIONS_MADE, expectedTotalRequestsMade,
|
||||||
|
metricMap);
|
||||||
|
assertAbfsStatistics(SEND_REQUESTS, expectedRequestsMadeWithData,
|
||||||
|
metricMap);
|
||||||
|
assertAbfsStatistics(BYTES_SENT, expectedBytesSent, metricMap);
|
||||||
|
}
|
||||||
|
|
||||||
|
private int executeWritePattern(FSDataOutputStream opStream,
|
||||||
|
byte[] buffer,
|
||||||
|
int startOffset,
|
||||||
|
int writeLoopCount,
|
||||||
|
int writeSize)
|
||||||
|
throws IOException {
|
||||||
|
int dataSizeWritten = startOffset;
|
||||||
|
|
||||||
|
while (writeLoopCount > 0) {
|
||||||
|
opStream.write(buffer, startOffset, writeSize);
|
||||||
|
startOffset += writeSize;
|
||||||
|
writeLoopCount--;
|
||||||
|
}
|
||||||
|
|
||||||
|
dataSizeWritten = startOffset - dataSizeWritten;
|
||||||
|
return dataSizeWritten;
|
||||||
|
}
|
||||||
|
}
|
|
@ -44,10 +44,16 @@ public class ITestAbfsOutputStream extends AbstractAbfsIntegrationTest {
|
||||||
final AzureBlobFileSystem fs = getFileSystem(conf);
|
final AzureBlobFileSystem fs = getFileSystem(conf);
|
||||||
try (FSDataOutputStream out = fs.create(TEST_FILE_PATH)) {
|
try (FSDataOutputStream out = fs.create(TEST_FILE_PATH)) {
|
||||||
AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream();
|
AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream();
|
||||||
|
|
||||||
|
int maxConcurrentRequests
|
||||||
|
= getConfiguration().getWriteMaxConcurrentRequestCount();
|
||||||
|
if (stream.isAppendBlobStream()) {
|
||||||
|
maxConcurrentRequests = 1;
|
||||||
|
}
|
||||||
|
|
||||||
Assertions.assertThat(stream.getMaxConcurrentRequestCount()).describedAs(
|
Assertions.assertThat(stream.getMaxConcurrentRequestCount()).describedAs(
|
||||||
"maxConcurrentRequests should be " + getConfiguration()
|
"maxConcurrentRequests should be " + maxConcurrentRequests)
|
||||||
.getWriteMaxConcurrentRequestCount())
|
.isEqualTo(maxConcurrentRequests);
|
||||||
.isEqualTo(getConfiguration().getWriteMaxConcurrentRequestCount());
|
|
||||||
Assertions.assertThat(stream.getMaxRequestsThatCanBeQueued()).describedAs(
|
Assertions.assertThat(stream.getMaxRequestsThatCanBeQueued()).describedAs(
|
||||||
"maxRequestsToQueue should be " + getConfiguration()
|
"maxRequestsToQueue should be " + getConfiguration()
|
||||||
.getMaxWriteRequestsToQueue())
|
.getMaxWriteRequestsToQueue())
|
||||||
|
@ -67,6 +73,11 @@ public class ITestAbfsOutputStream extends AbstractAbfsIntegrationTest {
|
||||||
final AzureBlobFileSystem fs = getFileSystem(conf);
|
final AzureBlobFileSystem fs = getFileSystem(conf);
|
||||||
FSDataOutputStream out = fs.create(TEST_FILE_PATH);
|
FSDataOutputStream out = fs.create(TEST_FILE_PATH);
|
||||||
AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream();
|
AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream();
|
||||||
|
|
||||||
|
if (stream.isAppendBlobStream()) {
|
||||||
|
maxConcurrentRequests = 1;
|
||||||
|
}
|
||||||
|
|
||||||
Assertions.assertThat(stream.getMaxConcurrentRequestCount())
|
Assertions.assertThat(stream.getMaxConcurrentRequestCount())
|
||||||
.describedAs("maxConcurrentRequests should be " + maxConcurrentRequests)
|
.describedAs("maxConcurrentRequests should be " + maxConcurrentRequests)
|
||||||
.isEqualTo(maxConcurrentRequests);
|
.isEqualTo(maxConcurrentRequests);
|
||||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.fs.azurebfs.services;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -28,19 +27,22 @@ import org.junit.Test;
|
||||||
import org.mockito.ArgumentCaptor;
|
import org.mockito.ArgumentCaptor;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
|
import static org.mockito.ArgumentMatchers.refEq;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.anyInt;
|
|
||||||
import static org.mockito.Mockito.anyBoolean;
|
import static org.mockito.Mockito.anyBoolean;
|
||||||
import static org.mockito.Mockito.any;
|
import static org.mockito.Mockito.any;
|
||||||
import static org.mockito.Mockito.anyString;
|
import static org.mockito.Mockito.anyString;
|
||||||
import static org.mockito.Mockito.anyLong;
|
import static org.mockito.Mockito.anyLong;
|
||||||
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE;
|
||||||
|
|
||||||
public final class TestAbfsOutputStream {
|
public final class TestAbfsOutputStream {
|
||||||
|
|
||||||
|
@ -83,22 +85,15 @@ public final class TestAbfsOutputStream {
|
||||||
abfsConf = new AbfsConfiguration(conf, accountName1);
|
abfsConf = new AbfsConfiguration(conf, accountName1);
|
||||||
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
|
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
|
||||||
when(client.getAbfsPerfTracker()).thenReturn(tracker);
|
when(client.getAbfsPerfTracker()).thenReturn(tracker);
|
||||||
when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
|
when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
|
||||||
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
|
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
|
||||||
|
|
||||||
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
|
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
|
||||||
|
populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
|
||||||
final byte[] b = new byte[WRITE_SIZE];
|
final byte[] b = new byte[WRITE_SIZE];
|
||||||
new Random().nextBytes(b);
|
new Random().nextBytes(b);
|
||||||
out.write(b);
|
out.write(b);
|
||||||
out.hsync();
|
out.hsync();
|
||||||
ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
|
|
||||||
ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
|
|
||||||
ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
|
|
||||||
ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
|
|
||||||
ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
|
|
||||||
ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
|
|
||||||
ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
|
|
||||||
|
|
||||||
|
|
||||||
final byte[] b1 = new byte[2*WRITE_SIZE];
|
final byte[] b1 = new byte[2*WRITE_SIZE];
|
||||||
new Random().nextBytes(b1);
|
new Random().nextBytes(b1);
|
||||||
|
@ -108,13 +103,18 @@ public final class TestAbfsOutputStream {
|
||||||
|
|
||||||
out.hsync();
|
out.hsync();
|
||||||
|
|
||||||
verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
|
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
|
||||||
acSASToken.capture(), acAppendBlobAppend.capture());
|
0, 0, WRITE_SIZE, APPEND_MODE, false);
|
||||||
assertThat(Arrays.asList(PATH, PATH)).describedAs("Path of the requests").isEqualTo(acString.getAllValues());
|
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
|
||||||
assertThat(Arrays.asList(Long.valueOf(0), Long.valueOf(WRITE_SIZE))).describedAs("Write Position").isEqualTo(acLong.getAllValues());
|
WRITE_SIZE, 0, 2 * WRITE_SIZE, APPEND_MODE, false);
|
||||||
assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
|
|
||||||
assertThat(Arrays.asList(WRITE_SIZE, 2*WRITE_SIZE)).describedAs("Buffer length").isEqualTo(acBufferLength.getAllValues());
|
|
||||||
|
|
||||||
|
verify(client, times(1)).append(
|
||||||
|
eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
|
||||||
|
verify(client, times(1)).append(
|
||||||
|
eq(PATH), any(byte[].class), refEq(secondReqParameters), any());
|
||||||
|
// confirm there were only 2 invocations in all
|
||||||
|
verify(client, times(2)).append(
|
||||||
|
eq(PATH), any(byte[].class), any(), any());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -132,10 +132,11 @@ public final class TestAbfsOutputStream {
|
||||||
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
|
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
|
||||||
|
|
||||||
when(client.getAbfsPerfTracker()).thenReturn(tracker);
|
when(client.getAbfsPerfTracker()).thenReturn(tracker);
|
||||||
when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
|
when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
|
||||||
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
|
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
|
||||||
|
|
||||||
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
|
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
|
||||||
|
populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
|
||||||
final byte[] b = new byte[WRITE_SIZE];
|
final byte[] b = new byte[WRITE_SIZE];
|
||||||
new Random().nextBytes(b);
|
new Random().nextBytes(b);
|
||||||
|
|
||||||
|
@ -144,33 +145,29 @@ public final class TestAbfsOutputStream {
|
||||||
}
|
}
|
||||||
out.close();
|
out.close();
|
||||||
|
|
||||||
ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
|
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
|
||||||
ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
|
0, 0, BUFFER_SIZE, APPEND_MODE, false);
|
||||||
ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
|
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
|
||||||
ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
|
BUFFER_SIZE, 0, 5*WRITE_SIZE-BUFFER_SIZE, APPEND_MODE, false);
|
||||||
ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
|
|
||||||
ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
|
|
||||||
ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
|
|
||||||
|
|
||||||
verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
|
verify(client, times(1)).append(
|
||||||
acSASToken.capture(), acAppendBlobAppend.capture());
|
eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
|
||||||
assertThat(Arrays.asList(PATH, PATH)).describedAs("Path").isEqualTo(acString.getAllValues());
|
verify(client, times(1)).append(
|
||||||
assertThat(new HashSet<Long>(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("Position").isEqualTo(new HashSet<Long>(
|
eq(PATH), any(byte[].class), refEq(secondReqParameters), any());
|
||||||
acLong.getAllValues()));
|
// confirm there were only 2 invocations in all
|
||||||
assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
|
verify(client, times(2)).append(
|
||||||
assertThat(new HashSet<Integer>(Arrays.asList(BUFFER_SIZE, 5*WRITE_SIZE-BUFFER_SIZE))).describedAs("Buffer Length").isEqualTo(new HashSet<Integer>(
|
eq(PATH), any(byte[].class), any(), any());
|
||||||
acBufferLength.getAllValues()));
|
|
||||||
|
|
||||||
ArgumentCaptor<String> acFlushString = ArgumentCaptor.forClass(String.class);
|
ArgumentCaptor<String> acFlushPath = ArgumentCaptor.forClass(String.class);
|
||||||
ArgumentCaptor<Long> acFlushLong = ArgumentCaptor.forClass(Long.class);
|
ArgumentCaptor<Long> acFlushPosition = ArgumentCaptor.forClass(Long.class);
|
||||||
ArgumentCaptor<Boolean> acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class);
|
ArgumentCaptor<Boolean> acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class);
|
||||||
ArgumentCaptor<Boolean> acFlushClose = ArgumentCaptor.forClass(Boolean.class);
|
ArgumentCaptor<Boolean> acFlushClose = ArgumentCaptor.forClass(Boolean.class);
|
||||||
ArgumentCaptor<String> acFlushSASToken = ArgumentCaptor.forClass(String.class);
|
ArgumentCaptor<String> acFlushSASToken = ArgumentCaptor.forClass(String.class);
|
||||||
|
|
||||||
verify(client, times(1)).flush(acFlushString.capture(), acFlushLong.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
|
verify(client, times(1)).flush(acFlushPath.capture(), acFlushPosition.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
|
||||||
acFlushSASToken.capture());
|
acFlushSASToken.capture());
|
||||||
assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushString.getAllValues());
|
assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushPath.getAllValues());
|
||||||
assertThat(Arrays.asList(Long.valueOf(5*WRITE_SIZE))).describedAs("position").isEqualTo(acFlushLong.getAllValues());
|
assertThat(Arrays.asList(Long.valueOf(5*WRITE_SIZE))).describedAs("position").isEqualTo(acFlushPosition.getAllValues());
|
||||||
assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
|
assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
|
||||||
assertThat(Arrays.asList(true)).describedAs("Close flag").isEqualTo(acFlushClose.getAllValues());
|
assertThat(Arrays.asList(true)).describedAs("Close flag").isEqualTo(acFlushClose.getAllValues());
|
||||||
}
|
}
|
||||||
|
@ -191,12 +188,13 @@ public final class TestAbfsOutputStream {
|
||||||
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
|
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
|
||||||
|
|
||||||
when(client.getAbfsPerfTracker()).thenReturn(tracker);
|
when(client.getAbfsPerfTracker()).thenReturn(tracker);
|
||||||
when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
|
when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
|
||||||
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
|
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
|
||||||
when(op.getSasToken()).thenReturn("testToken");
|
when(op.getSasToken()).thenReturn("testToken");
|
||||||
when(op.getResult()).thenReturn(httpOp);
|
when(op.getResult()).thenReturn(httpOp);
|
||||||
|
|
||||||
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
|
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
|
||||||
|
populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
|
||||||
final byte[] b = new byte[BUFFER_SIZE];
|
final byte[] b = new byte[BUFFER_SIZE];
|
||||||
new Random().nextBytes(b);
|
new Random().nextBytes(b);
|
||||||
|
|
||||||
|
@ -205,35 +203,31 @@ public final class TestAbfsOutputStream {
|
||||||
}
|
}
|
||||||
out.close();
|
out.close();
|
||||||
|
|
||||||
ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
|
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
|
||||||
ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
|
0, 0, BUFFER_SIZE, APPEND_MODE, false);
|
||||||
ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
|
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
|
||||||
ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
|
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false);
|
||||||
ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
|
|
||||||
ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
|
|
||||||
ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
|
|
||||||
|
|
||||||
verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
|
verify(client, times(1)).append(
|
||||||
acSASToken.capture(), acAppendBlobAppend.capture());
|
eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
|
||||||
assertThat(Arrays.asList(PATH, PATH)).describedAs("path").isEqualTo(acString.getAllValues());
|
verify(client, times(1)).append(
|
||||||
assertThat(new HashSet<Long>(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("Position").isEqualTo(new HashSet<Long>(
|
eq(PATH), any(byte[].class), refEq(secondReqParameters), any());
|
||||||
acLong.getAllValues()));
|
// confirm there were only 2 invocations in all
|
||||||
assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
|
verify(client, times(2)).append(
|
||||||
assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues());
|
eq(PATH), any(byte[].class), any(), any());
|
||||||
|
|
||||||
ArgumentCaptor<String> acFlushString = ArgumentCaptor.forClass(String.class);
|
ArgumentCaptor<String> acFlushPath = ArgumentCaptor.forClass(String.class);
|
||||||
ArgumentCaptor<Long> acFlushLong = ArgumentCaptor.forClass(Long.class);
|
ArgumentCaptor<Long> acFlushPosition = ArgumentCaptor.forClass(Long.class);
|
||||||
ArgumentCaptor<Boolean> acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class);
|
ArgumentCaptor<Boolean> acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class);
|
||||||
ArgumentCaptor<Boolean> acFlushClose = ArgumentCaptor.forClass(Boolean.class);
|
ArgumentCaptor<Boolean> acFlushClose = ArgumentCaptor.forClass(Boolean.class);
|
||||||
ArgumentCaptor<String> acFlushSASToken = ArgumentCaptor.forClass(String.class);
|
ArgumentCaptor<String> acFlushSASToken = ArgumentCaptor.forClass(String.class);
|
||||||
|
|
||||||
verify(client, times(1)).flush(acFlushString.capture(), acFlushLong.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
|
verify(client, times(1)).flush(acFlushPath.capture(), acFlushPosition.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
|
||||||
acFlushSASToken.capture());
|
acFlushSASToken.capture());
|
||||||
assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushString.getAllValues());
|
assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushPath.getAllValues());
|
||||||
assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushLong.getAllValues());
|
assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushPosition.getAllValues());
|
||||||
assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
|
assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
|
||||||
assertThat(Arrays.asList(true)).describedAs("Close flag").isEqualTo(acFlushClose.getAllValues());
|
assertThat(Arrays.asList(true)).describedAs("Close flag").isEqualTo(acFlushClose.getAllValues());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -252,12 +246,13 @@ public final class TestAbfsOutputStream {
|
||||||
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
|
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
|
||||||
|
|
||||||
when(client.getAbfsPerfTracker()).thenReturn(tracker);
|
when(client.getAbfsPerfTracker()).thenReturn(tracker);
|
||||||
when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
|
when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
|
||||||
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
|
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
|
||||||
when(op.getSasToken()).thenReturn("testToken");
|
when(op.getSasToken()).thenReturn("testToken");
|
||||||
when(op.getResult()).thenReturn(httpOp);
|
when(op.getResult()).thenReturn(httpOp);
|
||||||
|
|
||||||
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
|
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
|
||||||
|
populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
|
||||||
final byte[] b = new byte[BUFFER_SIZE];
|
final byte[] b = new byte[BUFFER_SIZE];
|
||||||
new Random().nextBytes(b);
|
new Random().nextBytes(b);
|
||||||
|
|
||||||
|
@ -266,22 +261,18 @@ public final class TestAbfsOutputStream {
|
||||||
}
|
}
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
|
|
||||||
ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
|
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
|
||||||
ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
|
0, 0, BUFFER_SIZE, APPEND_MODE, false);
|
||||||
ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
|
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
|
||||||
ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
|
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false);
|
||||||
ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
|
|
||||||
ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
|
|
||||||
ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
|
|
||||||
|
|
||||||
verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
|
|
||||||
acSASToken.capture(), acAppendBlobAppend.capture());
|
|
||||||
assertThat(Arrays.asList(PATH, PATH)).describedAs("File Path").isEqualTo(acString.getAllValues());
|
|
||||||
assertThat(new HashSet<Long>(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("Position in file").isEqualTo(
|
|
||||||
new HashSet<Long>(acLong.getAllValues()));
|
|
||||||
assertThat(Arrays.asList(0, 0)).describedAs("buffer offset").isEqualTo(acBufferOffset.getAllValues());
|
|
||||||
assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("buffer length").isEqualTo(acBufferLength.getAllValues());
|
|
||||||
|
|
||||||
|
verify(client, times(1)).append(
|
||||||
|
eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
|
||||||
|
verify(client, times(1)).append(
|
||||||
|
eq(PATH), any(byte[].class), refEq(secondReqParameters), any());
|
||||||
|
// confirm there were only 2 invocations in all
|
||||||
|
verify(client, times(2)).append(
|
||||||
|
eq(PATH), any(byte[].class), any(), any());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -299,10 +290,11 @@ public final class TestAbfsOutputStream {
|
||||||
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
|
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
|
||||||
|
|
||||||
when(client.getAbfsPerfTracker()).thenReturn(tracker);
|
when(client.getAbfsPerfTracker()).thenReturn(tracker);
|
||||||
when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
|
when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
|
||||||
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
|
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
|
||||||
|
|
||||||
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, true));
|
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
|
||||||
|
populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, true));
|
||||||
final byte[] b = new byte[BUFFER_SIZE];
|
final byte[] b = new byte[BUFFER_SIZE];
|
||||||
new Random().nextBytes(b);
|
new Random().nextBytes(b);
|
||||||
|
|
||||||
|
@ -311,22 +303,18 @@ public final class TestAbfsOutputStream {
|
||||||
}
|
}
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
|
|
||||||
ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
|
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
|
||||||
ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
|
0, 0, BUFFER_SIZE, APPEND_MODE, true);
|
||||||
ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
|
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
|
||||||
ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
|
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, true);
|
||||||
ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
|
|
||||||
ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
|
|
||||||
ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
|
|
||||||
|
|
||||||
verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
|
|
||||||
acSASToken.capture(), acAppendBlobAppend.capture());
|
|
||||||
assertThat(Arrays.asList(PATH, PATH)).describedAs("File Path").isEqualTo(acString.getAllValues());
|
|
||||||
assertThat(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE))).describedAs("File Position").isEqualTo(acLong.getAllValues());
|
|
||||||
assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
|
|
||||||
assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues());
|
|
||||||
assertThat(Arrays.asList(true, true)).describedAs("is AppendBlob Append").isEqualTo(acAppendBlobAppend.getAllValues());
|
|
||||||
|
|
||||||
|
verify(client, times(1)).append(
|
||||||
|
eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
|
||||||
|
verify(client, times(1)).append(
|
||||||
|
eq(PATH), any(byte[].class), refEq(secondReqParameters), any());
|
||||||
|
// confirm there were only 2 invocations in all
|
||||||
|
verify(client, times(2)).append(
|
||||||
|
eq(PATH), any(byte[].class), any(), any());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -337,6 +325,7 @@ public final class TestAbfsOutputStream {
|
||||||
|
|
||||||
AbfsClient client = mock(AbfsClient.class);
|
AbfsClient client = mock(AbfsClient.class);
|
||||||
AbfsRestOperation op = mock(AbfsRestOperation.class);
|
AbfsRestOperation op = mock(AbfsRestOperation.class);
|
||||||
|
when(op.getSasToken()).thenReturn("");
|
||||||
AbfsConfiguration abfsConf;
|
AbfsConfiguration abfsConf;
|
||||||
final Configuration conf = new Configuration();
|
final Configuration conf = new Configuration();
|
||||||
conf.set(accountKey1, accountValue1);
|
conf.set(accountKey1, accountValue1);
|
||||||
|
@ -344,10 +333,11 @@ public final class TestAbfsOutputStream {
|
||||||
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
|
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
|
||||||
|
|
||||||
when(client.getAbfsPerfTracker()).thenReturn(tracker);
|
when(client.getAbfsPerfTracker()).thenReturn(tracker);
|
||||||
when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
|
when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
|
||||||
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
|
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
|
||||||
|
|
||||||
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
|
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
|
||||||
|
populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
|
||||||
final byte[] b = new byte[BUFFER_SIZE];
|
final byte[] b = new byte[BUFFER_SIZE];
|
||||||
new Random().nextBytes(b);
|
new Random().nextBytes(b);
|
||||||
|
|
||||||
|
@ -356,35 +346,31 @@ public final class TestAbfsOutputStream {
|
||||||
}
|
}
|
||||||
out.hflush();
|
out.hflush();
|
||||||
|
|
||||||
ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
|
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
|
||||||
ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
|
0, 0, BUFFER_SIZE, APPEND_MODE, false);
|
||||||
ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
|
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
|
||||||
ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
|
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false);
|
||||||
ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
|
|
||||||
ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
|
|
||||||
ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
|
|
||||||
|
|
||||||
verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
|
verify(client, times(1)).append(
|
||||||
acSASToken.capture(), acAppendBlobAppend.capture());
|
eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
|
||||||
assertThat(Arrays.asList(PATH, PATH)).describedAs("File Path").isEqualTo(acString.getAllValues());
|
verify(client, times(1)).append(
|
||||||
assertThat(new HashSet<Long>(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("File Position").isEqualTo(
|
eq(PATH), any(byte[].class), refEq(secondReqParameters), any());
|
||||||
new HashSet<Long>(acLong.getAllValues()));
|
// confirm there were only 2 invocations in all
|
||||||
assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
|
verify(client, times(2)).append(
|
||||||
assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues());
|
eq(PATH), any(byte[].class), any(), any());
|
||||||
|
|
||||||
ArgumentCaptor<String> acFlushString = ArgumentCaptor.forClass(String.class);
|
ArgumentCaptor<String> acFlushPath = ArgumentCaptor.forClass(String.class);
|
||||||
ArgumentCaptor<Long> acFlushLong = ArgumentCaptor.forClass(Long.class);
|
ArgumentCaptor<Long> acFlushPosition = ArgumentCaptor.forClass(Long.class);
|
||||||
ArgumentCaptor<Boolean> acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class);
|
ArgumentCaptor<Boolean> acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class);
|
||||||
ArgumentCaptor<Boolean> acFlushClose = ArgumentCaptor.forClass(Boolean.class);
|
ArgumentCaptor<Boolean> acFlushClose = ArgumentCaptor.forClass(Boolean.class);
|
||||||
ArgumentCaptor<String> acFlushSASToken = ArgumentCaptor.forClass(String.class);
|
ArgumentCaptor<String> acFlushSASToken = ArgumentCaptor.forClass(String.class);
|
||||||
|
|
||||||
verify(client, times(1)).flush(acFlushString.capture(), acFlushLong.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
|
verify(client, times(1)).flush(acFlushPath.capture(), acFlushPosition.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
|
||||||
acFlushSASToken.capture());
|
acFlushSASToken.capture());
|
||||||
assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushString.getAllValues());
|
assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushPath.getAllValues());
|
||||||
assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushLong.getAllValues());
|
assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushPosition.getAllValues());
|
||||||
assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
|
assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
|
||||||
assertThat(Arrays.asList(false)).describedAs("Close flag").isEqualTo(acFlushClose.getAllValues());
|
assertThat(Arrays.asList(false)).describedAs("Close flag").isEqualTo(acFlushClose.getAllValues());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -401,10 +387,11 @@ public final class TestAbfsOutputStream {
|
||||||
abfsConf = new AbfsConfiguration(conf, accountName1);
|
abfsConf = new AbfsConfiguration(conf, accountName1);
|
||||||
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
|
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
|
||||||
when(client.getAbfsPerfTracker()).thenReturn(tracker);
|
when(client.getAbfsPerfTracker()).thenReturn(tracker);
|
||||||
when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
|
when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
|
||||||
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
|
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
|
||||||
|
|
||||||
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
|
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
|
||||||
|
populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
|
||||||
final byte[] b = new byte[BUFFER_SIZE];
|
final byte[] b = new byte[BUFFER_SIZE];
|
||||||
new Random().nextBytes(b);
|
new Random().nextBytes(b);
|
||||||
|
|
||||||
|
@ -415,21 +402,17 @@ public final class TestAbfsOutputStream {
|
||||||
out.flush();
|
out.flush();
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
|
|
||||||
ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
|
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
|
||||||
ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
|
0, 0, BUFFER_SIZE, APPEND_MODE, false);
|
||||||
ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
|
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
|
||||||
ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
|
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false);
|
||||||
ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
|
|
||||||
ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
|
|
||||||
ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
|
|
||||||
|
|
||||||
verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
|
|
||||||
acSASToken.capture(), acAppendBlobAppend.capture());
|
|
||||||
assertThat(Arrays.asList(PATH, PATH)).describedAs("path").isEqualTo(acString.getAllValues());
|
|
||||||
assertThat(new HashSet<Long>(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("Position").isEqualTo(
|
|
||||||
new HashSet<Long>(acLong.getAllValues()));
|
|
||||||
assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
|
|
||||||
assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues());
|
|
||||||
|
|
||||||
|
verify(client, times(1)).append(
|
||||||
|
eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
|
||||||
|
verify(client, times(1)).append(
|
||||||
|
eq(PATH), any(byte[].class), refEq(secondReqParameters), any());
|
||||||
|
// confirm there were only 2 invocations in all
|
||||||
|
verify(client, times(2)).append(
|
||||||
|
eq(PATH), any(byte[].class), any(), any());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue