HADOOP-18146: ABFS: Added changes for expect hundred continue header (#4039)
This change lets the client react pre-emptively to server load without getting to 503 and the exponential backoff which follows. This stops performance suffering so much as capacity limits are approached for an account. Contributed by Anmol Asranii
This commit is contained in:
parent
ee01c64c6c
commit
762d3ddb43
|
@ -48,7 +48,11 @@
|
|||
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]utils[\\/]Base64.java"/>
|
||||
<suppress checks="ParameterNumber|VisibilityModifier"
|
||||
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]ITestSmallWriteOptimization.java"/>
|
||||
<suppress checks="VisibilityModifier"
|
||||
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]ITestAbfsRestOperation.java"/>
|
||||
<!-- allow tests to use _ for ordering. -->
|
||||
<suppress checks="MethodName"
|
||||
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]commit[\\/]ITestAbfsTerasort.java"/>
|
||||
<suppress checks="ParameterNumber"
|
||||
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]TestAbfsOutputStream.java"/>
|
||||
</suppressions>
|
||||
|
|
|
@ -117,6 +117,11 @@ public class AbfsConfiguration{
|
|||
DefaultValue = DEFAULT_OPTIMIZE_FOOTER_READ)
|
||||
private boolean optimizeFooterRead;
|
||||
|
||||
@BooleanConfigurationValidatorAnnotation(
|
||||
ConfigurationKey = FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED,
|
||||
DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED)
|
||||
private boolean isExpectHeaderEnabled;
|
||||
|
||||
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED,
|
||||
DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED)
|
||||
private boolean accountThrottlingEnabled;
|
||||
|
@ -706,6 +711,10 @@ public class AbfsConfiguration{
|
|||
return this.azureAppendBlobDirs;
|
||||
}
|
||||
|
||||
public boolean isExpectHeaderEnabled() {
|
||||
return this.isExpectHeaderEnabled;
|
||||
}
|
||||
|
||||
public boolean accountThrottlingEnabled() {
|
||||
return accountThrottlingEnabled;
|
||||
}
|
||||
|
|
|
@ -693,6 +693,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
|
|||
}
|
||||
return new AbfsOutputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds())
|
||||
.withWriteBufferSize(bufferSize)
|
||||
.enableExpectHeader(abfsConfiguration.isExpectHeaderEnabled())
|
||||
.enableFlush(abfsConfiguration.isFlushEnabled())
|
||||
.enableSmallWriteOptimization(abfsConfiguration.isSmallWriteOptimizationEnabled())
|
||||
.disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
|
||||
|
|
|
@ -64,6 +64,11 @@ public final class AbfsHttpConstants {
|
|||
public static final String HTTP_METHOD_PATCH = "PATCH";
|
||||
public static final String HTTP_METHOD_POST = "POST";
|
||||
public static final String HTTP_METHOD_PUT = "PUT";
|
||||
/**
|
||||
* All status codes less than http 100 signify error
|
||||
* and should qualify for retry.
|
||||
*/
|
||||
public static final int HTTP_CONTINUE = 100;
|
||||
|
||||
// Abfs generic constants
|
||||
public static final String SINGLE_WHITE_SPACE = " ";
|
||||
|
@ -103,6 +108,9 @@ public final class AbfsHttpConstants {
|
|||
public static final String DEFAULT_SCOPE = "default:";
|
||||
public static final String PERMISSION_FORMAT = "%04d";
|
||||
public static final String SUPER_USER = "$superuser";
|
||||
// The HTTP 100 Continue informational status response code indicates that everything so far
|
||||
// is OK and that the client should continue with the request or ignore it if it is already finished.
|
||||
public static final String HUNDRED_CONTINUE = "100-continue";
|
||||
|
||||
public static final char CHAR_FORWARD_SLASH = '/';
|
||||
public static final char CHAR_EXCLAMATION_POINT = '!';
|
||||
|
|
|
@ -35,6 +35,11 @@ public final class ConfigurationKeys {
|
|||
* path to determine HNS status.
|
||||
*/
|
||||
public static final String FS_AZURE_ACCOUNT_IS_HNS_ENABLED = "fs.azure.account.hns.enabled";
|
||||
/**
|
||||
* Enable or disable expect hundred continue header.
|
||||
* Value: {@value}.
|
||||
*/
|
||||
public static final String FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = "fs.azure.account.expect.header.enabled";
|
||||
public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME = "fs.azure.account.key";
|
||||
public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX = "fs\\.azure\\.account\\.key\\.(.*)";
|
||||
public static final String FS_AZURE_SECURE_MODE = "fs.azure.secure.mode";
|
||||
|
|
|
@ -32,7 +32,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_ST
|
|||
public final class FileSystemConfigurations {
|
||||
|
||||
public static final String DEFAULT_FS_AZURE_ACCOUNT_IS_HNS_ENABLED = "";
|
||||
|
||||
public static final boolean DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = true;
|
||||
public static final String USER_HOME_DIRECTORY_PREFIX = "/user";
|
||||
|
||||
private static final int SIXTY_SECONDS = 60 * 1000;
|
||||
|
|
|
@ -70,6 +70,7 @@ public final class HttpHeaderConfigurations {
|
|||
public static final String X_MS_LEASE_ID = "x-ms-lease-id";
|
||||
public static final String X_MS_PROPOSED_LEASE_ID = "x-ms-proposed-lease-id";
|
||||
public static final String X_MS_LEASE_BREAK_PERIOD = "x-ms-lease-break-period";
|
||||
public static final String EXPECT = "Expect";
|
||||
|
||||
private HttpHeaderConfigurations() {}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,9 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
|
|||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public class InvalidAbfsRestOperationException extends AbfsRestOperationException {
|
||||
|
||||
private static final String ERROR_MESSAGE = "InvalidAbfsRestOperationException";
|
||||
|
||||
public InvalidAbfsRestOperationException(
|
||||
final Exception innerException) {
|
||||
super(
|
||||
|
@ -37,7 +40,23 @@ public class InvalidAbfsRestOperationException extends AbfsRestOperationExceptio
|
|||
AzureServiceErrorCode.UNKNOWN.getErrorCode(),
|
||||
innerException != null
|
||||
? innerException.toString()
|
||||
: "InvalidAbfsRestOperationException",
|
||||
: ERROR_MESSAGE,
|
||||
innerException);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds the retry count along with the exception.
|
||||
* @param innerException The inner exception which is originally caught.
|
||||
* @param retryCount The retry count when the exception was thrown.
|
||||
*/
|
||||
public InvalidAbfsRestOperationException(
|
||||
final Exception innerException, int retryCount) {
|
||||
super(
|
||||
AzureServiceErrorCode.UNKNOWN.getStatusCode(),
|
||||
AzureServiceErrorCode.UNKNOWN.getErrorCode(),
|
||||
innerException != null
|
||||
? innerException.toString()
|
||||
: ERROR_MESSAGE + " RetryCount: " + retryCount,
|
||||
innerException);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,19 +34,22 @@ public class AppendRequestParameters {
|
|||
private final Mode mode;
|
||||
private final boolean isAppendBlob;
|
||||
private final String leaseId;
|
||||
private boolean isExpectHeaderEnabled;
|
||||
|
||||
public AppendRequestParameters(final long position,
|
||||
final int offset,
|
||||
final int length,
|
||||
final Mode mode,
|
||||
final boolean isAppendBlob,
|
||||
final String leaseId) {
|
||||
final String leaseId,
|
||||
final boolean isExpectHeaderEnabled) {
|
||||
this.position = position;
|
||||
this.offset = offset;
|
||||
this.length = length;
|
||||
this.mode = mode;
|
||||
this.isAppendBlob = isAppendBlob;
|
||||
this.leaseId = leaseId;
|
||||
this.isExpectHeaderEnabled = isExpectHeaderEnabled;
|
||||
}
|
||||
|
||||
public long getPosition() {
|
||||
|
@ -72,4 +75,12 @@ public class AppendRequestParameters {
|
|||
public String getLeaseId() {
|
||||
return this.leaseId;
|
||||
}
|
||||
|
||||
public boolean isExpectHeaderEnabled() {
|
||||
return isExpectHeaderEnabled;
|
||||
}
|
||||
|
||||
public void setExpectHeaderEnabled(boolean expectHeaderEnabled) {
|
||||
isExpectHeaderEnabled = expectHeaderEnabled;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -77,6 +77,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.S
|
|||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
|
||||
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND;
|
||||
|
||||
/**
|
||||
|
@ -656,6 +657,9 @@ public class AbfsClient implements Closeable {
|
|||
throws AzureBlobFileSystemException {
|
||||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
|
||||
addCustomerProvidedKeyHeaders(requestHeaders);
|
||||
if (reqParams.isExpectHeaderEnabled()) {
|
||||
requestHeaders.add(new AbfsHttpHeader(EXPECT, HUNDRED_CONTINUE));
|
||||
}
|
||||
// JDK7 does not support PATCH, so to workaround the issue we will use
|
||||
// PUT and specify the real method in the X-Http-Method-Override header.
|
||||
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
|
||||
|
@ -681,29 +685,7 @@ public class AbfsClient implements Closeable {
|
|||
abfsUriQueryBuilder, cachedSasToken);
|
||||
|
||||
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
|
||||
final AbfsRestOperation op = new AbfsRestOperation(
|
||||
AbfsRestOperationType.Append,
|
||||
this,
|
||||
HTTP_METHOD_PUT,
|
||||
url,
|
||||
requestHeaders,
|
||||
buffer,
|
||||
reqParams.getoffset(),
|
||||
reqParams.getLength(),
|
||||
sasTokenForReuse);
|
||||
try {
|
||||
op.execute(tracingContext);
|
||||
} catch (AzureBlobFileSystemException e) {
|
||||
// If we have no HTTP response, throw the original exception.
|
||||
if (!op.hasResult()) {
|
||||
throw e;
|
||||
}
|
||||
if (reqParams.isAppendBlob()
|
||||
&& appendSuccessCheckOp(op, path,
|
||||
(reqParams.getPosition() + reqParams.getLength()), tracingContext)) {
|
||||
final AbfsRestOperation successOp = new AbfsRestOperation(
|
||||
AbfsRestOperationType.Append,
|
||||
this,
|
||||
final AbfsRestOperation op = getAbfsRestOperationForAppend(AbfsRestOperationType.Append,
|
||||
HTTP_METHOD_PUT,
|
||||
url,
|
||||
requestHeaders,
|
||||
|
@ -711,6 +693,41 @@ public class AbfsClient implements Closeable {
|
|||
reqParams.getoffset(),
|
||||
reqParams.getLength(),
|
||||
sasTokenForReuse);
|
||||
try {
|
||||
op.execute(tracingContext);
|
||||
} catch (AzureBlobFileSystemException e) {
|
||||
/*
|
||||
If the http response code indicates a user error we retry
|
||||
the same append request with expect header being disabled.
|
||||
When "100-continue" header is enabled but a non Http 100 response comes,
|
||||
the response message might not get set correctly by the server.
|
||||
So, this handling is to avoid breaking of backward compatibility
|
||||
if someone has taken dependency on the exception message,
|
||||
which is created using the error string present in the response header.
|
||||
*/
|
||||
int responseStatusCode = ((AbfsRestOperationException) e).getStatusCode();
|
||||
if (checkUserError(responseStatusCode) && reqParams.isExpectHeaderEnabled()) {
|
||||
LOG.debug("User error, retrying without 100 continue enabled for the given path {}", path);
|
||||
reqParams.setExpectHeaderEnabled(false);
|
||||
return this.append(path, buffer, reqParams, cachedSasToken,
|
||||
tracingContext);
|
||||
}
|
||||
// If we have no HTTP response, throw the original exception.
|
||||
if (!op.hasResult()) {
|
||||
throw e;
|
||||
}
|
||||
if (reqParams.isAppendBlob()
|
||||
&& appendSuccessCheckOp(op, path,
|
||||
(reqParams.getPosition() + reqParams.getLength()), tracingContext)) {
|
||||
final AbfsRestOperation successOp = getAbfsRestOperationForAppend(
|
||||
AbfsRestOperationType.Append,
|
||||
HTTP_METHOD_PUT,
|
||||
url,
|
||||
requestHeaders,
|
||||
buffer,
|
||||
reqParams.getoffset(),
|
||||
reqParams.getLength(),
|
||||
sasTokenForReuse);
|
||||
successOp.hardSetResult(HttpURLConnection.HTTP_OK);
|
||||
return successOp;
|
||||
}
|
||||
|
@ -720,6 +737,48 @@ public class AbfsClient implements Closeable {
|
|||
return op;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the rest operation for append.
|
||||
* @param operationType The AbfsRestOperationType.
|
||||
* @param httpMethod specifies the httpMethod.
|
||||
* @param url specifies the url.
|
||||
* @param requestHeaders This includes the list of request headers.
|
||||
* @param buffer The buffer to write into.
|
||||
* @param bufferOffset The buffer offset.
|
||||
* @param bufferLength The buffer Length.
|
||||
* @param sasTokenForReuse The sasToken.
|
||||
* @return AbfsRestOperation op.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
AbfsRestOperation getAbfsRestOperationForAppend(final AbfsRestOperationType operationType,
|
||||
final String httpMethod,
|
||||
final URL url,
|
||||
final List<AbfsHttpHeader> requestHeaders,
|
||||
final byte[] buffer,
|
||||
final int bufferOffset,
|
||||
final int bufferLength,
|
||||
final String sasTokenForReuse) {
|
||||
return new AbfsRestOperation(
|
||||
operationType,
|
||||
this,
|
||||
httpMethod,
|
||||
url,
|
||||
requestHeaders,
|
||||
buffer,
|
||||
bufferOffset,
|
||||
bufferLength, sasTokenForReuse);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the status code lies in the range of user error.
|
||||
* @param responseStatusCode http response status code.
|
||||
* @return True or False.
|
||||
*/
|
||||
private boolean checkUserError(int responseStatusCode) {
|
||||
return (responseStatusCode >= HttpURLConnection.HTTP_BAD_REQUEST
|
||||
&& responseStatusCode < HttpURLConnection.HTTP_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
// For AppendBlob its possible that the append succeeded in the backend but the request failed.
|
||||
// However a retry would fail with an InvalidQueryParameterValue
|
||||
// (as the current offset would be unacceptable).
|
||||
|
|
|
@ -28,6 +28,8 @@ import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
|||
import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
|
||||
|
||||
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
|
||||
|
||||
/**
|
||||
* Throttles Azure Blob File System read and write operations to achieve maximum
|
||||
* throughput by minimizing errors. The errors occur when the account ingress
|
||||
|
@ -60,7 +62,7 @@ public final class AbfsClientThrottlingIntercept implements AbfsThrottlingInterc
|
|||
|
||||
// Hide default constructor
|
||||
private AbfsClientThrottlingIntercept(AbfsConfiguration abfsConfiguration) {
|
||||
//Account name is kept as empty as same instance is shared across all accounts
|
||||
// Account name is kept as empty as same instance is shared across all accounts.
|
||||
this.accountName = "";
|
||||
this.readThrottler = setAnalyzer("read", abfsConfiguration);
|
||||
this.writeThrottler = setAnalyzer("write", abfsConfiguration);
|
||||
|
@ -114,6 +116,18 @@ public final class AbfsClientThrottlingIntercept implements AbfsThrottlingInterc
|
|||
return singleton;
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the metrics for the case when response code signifies throttling
|
||||
* but there are some expected bytes to be sent.
|
||||
* @param isThrottledOperation returns true if status code is HTTP_UNAVAILABLE
|
||||
* @param abfsHttpOperation Used for status code and data transferred.
|
||||
* @return true if the operation is throttled and has some bytes to transfer.
|
||||
*/
|
||||
private boolean updateBytesTransferred(boolean isThrottledOperation,
|
||||
AbfsHttpOperation abfsHttpOperation) {
|
||||
return isThrottledOperation && abfsHttpOperation.getExpectedBytesToBeSent() > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the metrics for successful and failed read and write operations.
|
||||
* @param operationType Only applicable for read and write operations.
|
||||
|
@ -134,9 +148,22 @@ public final class AbfsClientThrottlingIntercept implements AbfsThrottlingInterc
|
|||
boolean isFailedOperation = (status < HttpURLConnection.HTTP_OK
|
||||
|| status >= HttpURLConnection.HTTP_INTERNAL_ERROR);
|
||||
|
||||
// If status code is 503, it is considered as a throttled operation.
|
||||
boolean isThrottledOperation = (status == HTTP_UNAVAILABLE);
|
||||
|
||||
switch (operationType) {
|
||||
case Append:
|
||||
contentLength = abfsHttpOperation.getBytesSent();
|
||||
if (contentLength == 0) {
|
||||
/*
|
||||
Signifies the case where we could not update the bytesSent due to
|
||||
throttling but there were some expectedBytesToBeSent.
|
||||
*/
|
||||
if (updateBytesTransferred(isThrottledOperation, abfsHttpOperation)) {
|
||||
LOG.debug("Updating metrics due to throttling for path {}", abfsHttpOperation.getConnUrl().getPath());
|
||||
contentLength = abfsHttpOperation.getExpectedBytesToBeSent();
|
||||
}
|
||||
}
|
||||
if (contentLength > 0) {
|
||||
writeThrottler.addBytesTransferred(contentLength,
|
||||
isFailedOperation);
|
||||
|
|
|
@ -43,6 +43,9 @@ import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
|
|||
import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
|
||||
|
||||
/**
|
||||
* Represents an HTTP operation.
|
||||
*/
|
||||
|
@ -73,6 +76,7 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
|
|||
|
||||
// metrics
|
||||
private int bytesSent;
|
||||
private int expectedBytesToBeSent;
|
||||
private long bytesReceived;
|
||||
|
||||
// optional trace enabled metrics
|
||||
|
@ -155,6 +159,10 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
|
|||
return bytesSent;
|
||||
}
|
||||
|
||||
public int getExpectedBytesToBeSent() {
|
||||
return expectedBytesToBeSent;
|
||||
}
|
||||
|
||||
public long getBytesReceived() {
|
||||
return bytesReceived;
|
||||
}
|
||||
|
@ -282,7 +290,7 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
|
|||
this.connection.setRequestMethod(method);
|
||||
|
||||
for (AbfsHttpHeader header : requestHeaders) {
|
||||
this.connection.setRequestProperty(header.getName(), header.getValue());
|
||||
setRequestProperty(header.getName(), header.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -314,13 +322,44 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
|
|||
if (this.isTraceEnabled) {
|
||||
startTime = System.nanoTime();
|
||||
}
|
||||
try (OutputStream outputStream = this.connection.getOutputStream()) {
|
||||
// update bytes sent before they are sent so we may observe
|
||||
// attempted sends as well as successful sends via the
|
||||
// accompanying statusCode
|
||||
OutputStream outputStream = null;
|
||||
// Updates the expected bytes to be sent based on length.
|
||||
this.expectedBytesToBeSent = length;
|
||||
try {
|
||||
try {
|
||||
/* Without expect header enabled, if getOutputStream() throws
|
||||
an exception, it gets caught by the restOperation. But with
|
||||
expect header enabled we return back without throwing an exception
|
||||
for the correct response code processing.
|
||||
*/
|
||||
outputStream = getConnOutputStream();
|
||||
} catch (IOException e) {
|
||||
/* If getOutputStream fails with an exception and expect header
|
||||
is enabled, we return back without throwing an exception to
|
||||
the caller. The caller is responsible for setting the correct status code.
|
||||
If expect header is not enabled, we throw back the exception.
|
||||
*/
|
||||
String expectHeader = getConnProperty(EXPECT);
|
||||
if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE)) {
|
||||
LOG.debug("Getting output stream failed with expect header enabled, returning back ", e);
|
||||
return;
|
||||
} else {
|
||||
LOG.debug("Getting output stream failed without expect header enabled, throwing exception ", e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
// update bytes sent for successful as well as failed attempts via the
|
||||
// accompanying statusCode.
|
||||
this.bytesSent = length;
|
||||
|
||||
// If this fails with or without expect header enabled,
|
||||
// it throws an IOException.
|
||||
outputStream.write(buffer, offset, length);
|
||||
} finally {
|
||||
// Closing the opened output stream
|
||||
if (outputStream != null) {
|
||||
outputStream.close();
|
||||
}
|
||||
if (this.isTraceEnabled) {
|
||||
this.sendRequestTimeMs = elapsedTimeMs(startTime);
|
||||
}
|
||||
|
@ -344,13 +383,13 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
|
|||
startTime = System.nanoTime();
|
||||
}
|
||||
|
||||
this.statusCode = this.connection.getResponseCode();
|
||||
this.statusCode = getConnResponseCode();
|
||||
|
||||
if (this.isTraceEnabled) {
|
||||
this.recvResponseTimeMs = elapsedTimeMs(startTime);
|
||||
}
|
||||
|
||||
this.statusDescription = this.connection.getResponseMessage();
|
||||
this.statusDescription = getConnResponseMessage();
|
||||
|
||||
this.requestId = this.connection.getHeaderField(HttpHeaderConfigurations.X_MS_REQUEST_ID);
|
||||
if (this.requestId == null) {
|
||||
|
@ -543,6 +582,58 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
|
|||
return stream == null ? true : false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the connection request property for a key.
|
||||
* @param key The request property key.
|
||||
* @return request peoperty value.
|
||||
*/
|
||||
String getConnProperty(String key) {
|
||||
return connection.getRequestProperty(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the connection url.
|
||||
* @return url.
|
||||
*/
|
||||
URL getConnUrl() {
|
||||
return connection.getURL();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the connection request method.
|
||||
* @return request method.
|
||||
*/
|
||||
String getConnRequestMethod() {
|
||||
return connection.getRequestMethod();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the connection response code.
|
||||
* @return response code.
|
||||
* @throws IOException
|
||||
*/
|
||||
Integer getConnResponseCode() throws IOException {
|
||||
return connection.getResponseCode();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the connection output stream.
|
||||
* @return output stream.
|
||||
* @throws IOException
|
||||
*/
|
||||
OutputStream getConnOutputStream() throws IOException {
|
||||
return connection.getOutputStream();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the connection response message.
|
||||
* @return response message.
|
||||
* @throws IOException
|
||||
*/
|
||||
String getConnResponseMessage() throws IOException {
|
||||
return connection.getResponseMessage();
|
||||
}
|
||||
|
||||
public static class AbfsHttpOperationWithFixedResult extends AbfsHttpOperation {
|
||||
/**
|
||||
* Creates an instance to represent fixed results.
|
||||
|
|
|
@ -80,6 +80,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
|
|||
private boolean disableOutputStreamFlush;
|
||||
private boolean enableSmallWriteOptimization;
|
||||
private boolean isAppendBlob;
|
||||
private boolean isExpectHeaderEnabled;
|
||||
private volatile IOException lastError;
|
||||
|
||||
private long lastFlushOffset;
|
||||
|
@ -133,6 +134,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
|
|||
this.position = abfsOutputStreamContext.getPosition();
|
||||
this.closed = false;
|
||||
this.supportFlush = abfsOutputStreamContext.isEnableFlush();
|
||||
this.isExpectHeaderEnabled = abfsOutputStreamContext.isExpectHeaderEnabled();
|
||||
this.disableOutputStreamFlush = abfsOutputStreamContext
|
||||
.isDisableOutputStreamFlush();
|
||||
this.enableSmallWriteOptimization
|
||||
|
@ -327,7 +329,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
|
|||
* leaseId - The AbfsLeaseId for this request.
|
||||
*/
|
||||
AppendRequestParameters reqParams = new AppendRequestParameters(
|
||||
offset, 0, bytesLength, mode, false, leaseId);
|
||||
offset, 0, bytesLength, mode, false, leaseId, isExpectHeaderEnabled);
|
||||
AbfsRestOperation op =
|
||||
client.append(path, blockUploadData.toByteArray(), reqParams,
|
||||
cachedSasToken.get(), new TracingContext(tracingContext));
|
||||
|
@ -573,7 +575,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
|
|||
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
|
||||
"writeCurrentBufferToService", "append")) {
|
||||
AppendRequestParameters reqParams = new AppendRequestParameters(offset, 0,
|
||||
bytesLength, APPEND_MODE, true, leaseId);
|
||||
bytesLength, APPEND_MODE, true, leaseId, isExpectHeaderEnabled);
|
||||
AbfsRestOperation op = client.append(path, uploadData.toByteArray(), reqParams,
|
||||
cachedSasToken.get(), new TracingContext(tracingContext));
|
||||
cachedSasToken.update(op.getSasToken());
|
||||
|
|
|
@ -33,6 +33,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
|
|||
|
||||
private boolean enableFlush;
|
||||
|
||||
private boolean enableExpectHeader;
|
||||
|
||||
private boolean enableSmallWriteOptimization;
|
||||
|
||||
private boolean disableOutputStreamFlush;
|
||||
|
@ -78,6 +80,11 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
|
|||
return this;
|
||||
}
|
||||
|
||||
public AbfsOutputStreamContext enableExpectHeader(final boolean enableExpectHeader) {
|
||||
this.enableExpectHeader = enableExpectHeader;
|
||||
return this;
|
||||
}
|
||||
|
||||
public AbfsOutputStreamContext enableSmallWriteOptimization(final boolean enableSmallWriteOptimization) {
|
||||
this.enableSmallWriteOptimization = enableSmallWriteOptimization;
|
||||
return this;
|
||||
|
@ -184,6 +191,10 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
|
|||
return enableFlush;
|
||||
}
|
||||
|
||||
public boolean isExpectHeaderEnabled() {
|
||||
return enableExpectHeader;
|
||||
}
|
||||
|
||||
public boolean isDisableOutputStreamFlush() {
|
||||
return disableOutputStreamFlush;
|
||||
}
|
||||
|
|
|
@ -38,6 +38,8 @@ import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
|
|||
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
|
||||
import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_CONTINUE;
|
||||
|
||||
/**
|
||||
* The AbfsRestOperation for Rest AbfsClient.
|
||||
*/
|
||||
|
@ -236,11 +238,21 @@ public class AbfsRestOperation {
|
|||
}
|
||||
}
|
||||
|
||||
if (result.getStatusCode() >= HttpURLConnection.HTTP_BAD_REQUEST) {
|
||||
int status = result.getStatusCode();
|
||||
/*
|
||||
If even after exhausting all retries, the http status code has an
|
||||
invalid value it qualifies for InvalidAbfsRestOperationException.
|
||||
All http status code less than 1xx range are considered as invalid
|
||||
status codes.
|
||||
*/
|
||||
if (status < HTTP_CONTINUE) {
|
||||
throw new InvalidAbfsRestOperationException(null, retryCount);
|
||||
}
|
||||
|
||||
if (status >= HttpURLConnection.HTTP_BAD_REQUEST) {
|
||||
throw new AbfsRestOperationException(result.getStatusCode(), result.getStorageErrorCode(),
|
||||
result.getStorageErrorMessage(), null, result);
|
||||
}
|
||||
|
||||
LOG.trace("{} REST operation complete", operationType);
|
||||
}
|
||||
|
||||
|
@ -268,7 +280,7 @@ public class AbfsRestOperation {
|
|||
case Custom:
|
||||
case OAuth:
|
||||
LOG.debug("Authenticating request with OAuth2 access token");
|
||||
httpOperation.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION,
|
||||
httpOperation.setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION,
|
||||
client.getAccessToken());
|
||||
break;
|
||||
case SAS:
|
||||
|
@ -319,7 +331,7 @@ public class AbfsRestOperation {
|
|||
LOG.warn("Unknown host name: {}. Retrying to resolve the host name...",
|
||||
hostname);
|
||||
if (!client.getRetryPolicy().shouldRetry(retryCount, -1)) {
|
||||
throw new InvalidAbfsRestOperationException(ex);
|
||||
throw new InvalidAbfsRestOperationException(ex, retryCount);
|
||||
}
|
||||
return false;
|
||||
} catch (IOException ex) {
|
||||
|
@ -330,12 +342,25 @@ public class AbfsRestOperation {
|
|||
failureReason = RetryReason.getAbbreviation(ex, -1, "");
|
||||
|
||||
if (!client.getRetryPolicy().shouldRetry(retryCount, -1)) {
|
||||
throw new InvalidAbfsRestOperationException(ex);
|
||||
throw new InvalidAbfsRestOperationException(ex, retryCount);
|
||||
}
|
||||
|
||||
return false;
|
||||
} finally {
|
||||
intercept.updateMetrics(operationType, httpOperation);
|
||||
int status = httpOperation.getStatusCode();
|
||||
/*
|
||||
A status less than 300 (2xx range) or greater than or equal
|
||||
to 500 (5xx range) should contribute to throttling metrics being updated.
|
||||
Less than 200 or greater than or equal to 500 show failed operations. 2xx
|
||||
range contributes to successful operations. 3xx range is for redirects
|
||||
and 4xx range is for user errors. These should not be a part of
|
||||
throttling backoff computation.
|
||||
*/
|
||||
boolean updateMetricsResponseCode = (status < HttpURLConnection.HTTP_MULT_CHOICE
|
||||
|| status >= HttpURLConnection.HTTP_INTERNAL_ERROR);
|
||||
if (updateMetricsResponseCode) {
|
||||
intercept.updateMetrics(operationType, httpOperation);
|
||||
}
|
||||
}
|
||||
|
||||
LOG.debug("HttpRequest: {}: {}", operationType, httpOperation);
|
||||
|
|
|
@ -24,6 +24,8 @@ import java.net.HttpURLConnection;
|
|||
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
||||
import org.apache.hadoop.classification.VisibleForTesting;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_CONTINUE;
|
||||
|
||||
/**
|
||||
* Retry policy used by AbfsClient.
|
||||
* */
|
||||
|
@ -118,7 +120,9 @@ public class ExponentialRetryPolicy {
|
|||
|
||||
/**
|
||||
* Returns if a request should be retried based on the retry count, current response,
|
||||
* and the current strategy.
|
||||
* and the current strategy. The valid http status code lies in the range of 1xx-5xx.
|
||||
* But an invalid status code might be set due to network or timeout kind of issues.
|
||||
* Such invalid status code also qualify for retry.
|
||||
*
|
||||
* @param retryCount The current retry attempt count.
|
||||
* @param statusCode The status code of the response, or -1 for socket error.
|
||||
|
@ -126,7 +130,7 @@ public class ExponentialRetryPolicy {
|
|||
*/
|
||||
public boolean shouldRetry(final int retryCount, final int statusCode) {
|
||||
return retryCount < this.retryCount
|
||||
&& (statusCode == -1
|
||||
&& (statusCode < HTTP_CONTINUE
|
||||
|| statusCode == HttpURLConnection.HTTP_CLIENT_TIMEOUT
|
||||
|| (statusCode >= HttpURLConnection.HTTP_INTERNAL_ERROR
|
||||
&& statusCode != HttpURLConnection.HTTP_NOT_IMPLEMENTED
|
||||
|
|
|
@ -149,6 +149,10 @@ public class TracingContext {
|
|||
this.opType = operation;
|
||||
}
|
||||
|
||||
public int getRetryCount() {
|
||||
return retryCount;
|
||||
}
|
||||
|
||||
public void setRetryCount(int retryCount) {
|
||||
this.retryCount = retryCount;
|
||||
}
|
||||
|
|
|
@ -767,6 +767,17 @@ Hflush() being the only documented API that can provide persistent data
|
|||
transfer, Flush() also attempting to persist buffered data will lead to
|
||||
performance issues.
|
||||
|
||||
### <a name="100continueconfigoptions"></a> Hundred Continue Options
|
||||
|
||||
`fs.azure.account.expect.header.enabled`: This configuration parameter is used
|
||||
to specify whether you wish to send a expect 100 continue header with each
|
||||
append request or not. It is configured to true by default. This flag configures
|
||||
the client to check with the Azure store before uploading a block of data from
|
||||
an output stream. This allows the client to throttle back gracefully -before
|
||||
actually attempting to upload the block. In experiments this provides
|
||||
significant throughput improvements under heavy load. For more information :
|
||||
- https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Expect
|
||||
|
||||
|
||||
### <a name="accountlevelthrottlingoptions"></a> Account level throttling Options
|
||||
|
||||
|
|
|
@ -42,7 +42,7 @@ import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
|
|||
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AuthType;
|
||||
import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient;
|
||||
import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient;
|
||||
import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore;
|
||||
import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
|
||||
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
|
||||
|
@ -254,7 +254,7 @@ public abstract class AbstractAbfsIntegrationTest extends
|
|||
}
|
||||
|
||||
public AccessTokenProvider getAccessTokenProvider(final AzureBlobFileSystem fs) {
|
||||
return TestAbfsClient.getAccessTokenProvider(fs.getAbfsStore().getClient());
|
||||
return ITestAbfsClient.getAccessTokenProvider(fs.getAbfsStore().getClient());
|
||||
}
|
||||
|
||||
public void loadConfiguredFileSystem() throws Exception {
|
||||
|
|
|
@ -43,7 +43,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConcurrentWriteOperati
|
|||
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
|
||||
import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient;
|
||||
import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient;
|
||||
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
|
||||
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
|
||||
|
||||
|
@ -362,7 +362,7 @@ public class ITestAzureBlobFileSystemCreate extends
|
|||
// Get mock AbfsClient with current config
|
||||
AbfsClient
|
||||
mockClient
|
||||
= TestAbfsClient.getMockAbfsClient(
|
||||
= ITestAbfsClient.getMockAbfsClient(
|
||||
fs.getAbfsStore().getClient(),
|
||||
fs.getAbfsStore().getAbfsConfiguration());
|
||||
|
||||
|
|
|
@ -35,7 +35,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationExcep
|
|||
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
|
||||
import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient;
|
||||
import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient;
|
||||
import org.apache.hadoop.fs.azurebfs.services.TestAbfsPerfTracker;
|
||||
import org.apache.hadoop.fs.azurebfs.utils.TestMockHelpers;
|
||||
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
|
||||
|
@ -176,7 +176,7 @@ public class ITestAzureBlobFileSystemDelete extends
|
|||
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
AbfsClient abfsClient = fs.getAbfsStore().getClient();
|
||||
AbfsClient testClient = TestAbfsClient.createTestClientFromCurrentContext(
|
||||
AbfsClient testClient = ITestAbfsClient.createTestClientFromCurrentContext(
|
||||
abfsClient,
|
||||
abfsConfig);
|
||||
|
||||
|
@ -223,7 +223,7 @@ public class ITestAzureBlobFileSystemDelete extends
|
|||
public void testDeleteIdempotencyTriggerHttp404() throws Exception {
|
||||
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
AbfsClient client = TestAbfsClient.createTestClientFromCurrentContext(
|
||||
AbfsClient client = ITestAbfsClient.createTestClientFromCurrentContext(
|
||||
fs.getAbfsStore().getClient(),
|
||||
this.getConfiguration());
|
||||
|
||||
|
@ -242,7 +242,7 @@ public class ITestAzureBlobFileSystemDelete extends
|
|||
getTestTracingContext(fs, true)));
|
||||
|
||||
// mock idempotency check to mimic retried case
|
||||
AbfsClient mockClient = TestAbfsClient.getMockAbfsClient(
|
||||
AbfsClient mockClient = ITestAbfsClient.getMockAbfsClient(
|
||||
fs.getAbfsStore().getClient(),
|
||||
this.getConfiguration());
|
||||
AzureBlobFileSystemStore mockStore = mock(AzureBlobFileSystemStore.class);
|
||||
|
@ -257,10 +257,10 @@ public class ITestAzureBlobFileSystemDelete extends
|
|||
|
||||
// Case 2: Mimic retried case
|
||||
// Idempotency check on Delete always returns success
|
||||
AbfsRestOperation idempotencyRetOp = TestAbfsClient.getRestOp(
|
||||
AbfsRestOperation idempotencyRetOp = ITestAbfsClient.getRestOp(
|
||||
DeletePath, mockClient, HTTP_METHOD_DELETE,
|
||||
TestAbfsClient.getTestUrl(mockClient, "/NonExistingPath"),
|
||||
TestAbfsClient.getTestRequestHeaders(mockClient));
|
||||
ITestAbfsClient.getTestUrl(mockClient, "/NonExistingPath"),
|
||||
ITestAbfsClient.getTestRequestHeaders(mockClient));
|
||||
idempotencyRetOp.hardSetResult(HTTP_OK);
|
||||
|
||||
doReturn(idempotencyRetOp).when(mockClient).deleteIdempotencyCheckOp(any());
|
||||
|
|
|
@ -203,7 +203,7 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
|||
// Trying to append with correct CPK headers
|
||||
AppendRequestParameters appendRequestParameters =
|
||||
new AppendRequestParameters(
|
||||
0, 0, 5, Mode.APPEND_MODE, false, null);
|
||||
0, 0, 5, Mode.APPEND_MODE, false, null, true);
|
||||
byte[] buffer = getRandomBytesArray(5);
|
||||
AbfsClient abfsClient = fs.getAbfsClient();
|
||||
AbfsRestOperation abfsRestOperation = abfsClient
|
||||
|
@ -248,7 +248,7 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
|||
// Trying to append without CPK headers
|
||||
AppendRequestParameters appendRequestParameters =
|
||||
new AppendRequestParameters(
|
||||
0, 0, 5, Mode.APPEND_MODE, false, null);
|
||||
0, 0, 5, Mode.APPEND_MODE, false, null, true);
|
||||
byte[] buffer = getRandomBytesArray(5);
|
||||
AbfsClient abfsClient = fs.getAbfsClient();
|
||||
AbfsRestOperation abfsRestOperation = abfsClient
|
||||
|
|
|
@ -20,20 +20,43 @@ package org.apache.hadoop.fs.azurebfs.services;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.ProtocolException;
|
||||
import java.net.URL;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
||||
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
|
||||
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
|
||||
import org.apache.hadoop.fs.azurebfs.TestAbfsConfigurationFieldsValidation;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
|
||||
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
|
||||
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
|
||||
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
|
||||
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_ACTION;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_POSITION;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ABFS_ACCOUNT_NAME;
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
@ -59,14 +82,19 @@ import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST
|
|||
* Test useragent of abfs client.
|
||||
*
|
||||
*/
|
||||
public final class TestAbfsClient {
|
||||
public final class ITestAbfsClient extends AbstractAbfsIntegrationTest {
|
||||
|
||||
private static final String ACCOUNT_NAME = "bogusAccountName.dfs.core.windows.net";
|
||||
private static final String FS_AZURE_USER_AGENT_PREFIX = "Partner Service";
|
||||
private static final String TEST_PATH = "/testfile";
|
||||
public static final int REDUCED_RETRY_COUNT = 2;
|
||||
public static final int REDUCED_BACKOFF_INTERVAL = 100;
|
||||
public static final int BUFFER_LENGTH = 5;
|
||||
public static final int BUFFER_OFFSET = 0;
|
||||
|
||||
private final Pattern userAgentStringPattern;
|
||||
|
||||
public TestAbfsClient(){
|
||||
public ITestAbfsClient() throws Exception {
|
||||
StringBuilder regEx = new StringBuilder();
|
||||
regEx.append("^");
|
||||
regEx.append(APN_VERSION);
|
||||
|
@ -124,7 +152,7 @@ public final class TestAbfsClient {
|
|||
}
|
||||
|
||||
private void verifybBasicInfo(String userAgentStr) {
|
||||
assertThat(userAgentStr)
|
||||
Assertions.assertThat(userAgentStr)
|
||||
.describedAs("User-Agent string [" + userAgentStr
|
||||
+ "] should be of the pattern: " + this.userAgentStringPattern.pattern())
|
||||
.matches(this.userAgentStringPattern)
|
||||
|
@ -153,7 +181,7 @@ public final class TestAbfsClient {
|
|||
String userAgentStr = getUserAgentString(abfsConfiguration, false);
|
||||
|
||||
verifybBasicInfo(userAgentStr);
|
||||
assertThat(userAgentStr)
|
||||
Assertions.assertThat(userAgentStr)
|
||||
.describedAs("User-Agent string should contain " + FS_AZURE_USER_AGENT_PREFIX)
|
||||
.contains(FS_AZURE_USER_AGENT_PREFIX);
|
||||
|
||||
|
@ -163,7 +191,7 @@ public final class TestAbfsClient {
|
|||
userAgentStr = getUserAgentString(abfsConfiguration, false);
|
||||
|
||||
verifybBasicInfo(userAgentStr);
|
||||
assertThat(userAgentStr)
|
||||
Assertions.assertThat(userAgentStr)
|
||||
.describedAs("User-Agent string should not contain " + FS_AZURE_USER_AGENT_PREFIX)
|
||||
.doesNotContain(FS_AZURE_USER_AGENT_PREFIX);
|
||||
}
|
||||
|
@ -179,14 +207,14 @@ public final class TestAbfsClient {
|
|||
String userAgentStr = getUserAgentString(abfsConfiguration, true);
|
||||
|
||||
verifybBasicInfo(userAgentStr);
|
||||
assertThat(userAgentStr)
|
||||
Assertions.assertThat(userAgentStr)
|
||||
.describedAs("User-Agent string should contain sslProvider")
|
||||
.contains(DelegatingSSLSocketFactory.getDefaultFactory().getProviderName());
|
||||
|
||||
userAgentStr = getUserAgentString(abfsConfiguration, false);
|
||||
|
||||
verifybBasicInfo(userAgentStr);
|
||||
assertThat(userAgentStr)
|
||||
Assertions.assertThat(userAgentStr)
|
||||
.describedAs("User-Agent string should not contain sslProvider")
|
||||
.doesNotContain(DelegatingSSLSocketFactory.getDefaultFactory().getProviderName());
|
||||
}
|
||||
|
@ -202,7 +230,7 @@ public final class TestAbfsClient {
|
|||
String userAgentStr = getUserAgentString(abfsConfiguration, false);
|
||||
|
||||
verifybBasicInfo(userAgentStr);
|
||||
assertThat(userAgentStr)
|
||||
Assertions.assertThat(userAgentStr)
|
||||
.describedAs("User-Agent string should contain cluster name")
|
||||
.contains(clusterName);
|
||||
|
||||
|
@ -212,7 +240,7 @@ public final class TestAbfsClient {
|
|||
userAgentStr = getUserAgentString(abfsConfiguration, false);
|
||||
|
||||
verifybBasicInfo(userAgentStr);
|
||||
assertThat(userAgentStr)
|
||||
Assertions.assertThat(userAgentStr)
|
||||
.describedAs("User-Agent string should not contain cluster name")
|
||||
.doesNotContain(clusterName)
|
||||
.describedAs("User-Agent string should contain UNKNOWN as cluster name config is absent")
|
||||
|
@ -230,7 +258,7 @@ public final class TestAbfsClient {
|
|||
String userAgentStr = getUserAgentString(abfsConfiguration, false);
|
||||
|
||||
verifybBasicInfo(userAgentStr);
|
||||
assertThat(userAgentStr)
|
||||
Assertions.assertThat(userAgentStr)
|
||||
.describedAs("User-Agent string should contain cluster type")
|
||||
.contains(clusterType);
|
||||
|
||||
|
@ -240,7 +268,7 @@ public final class TestAbfsClient {
|
|||
userAgentStr = getUserAgentString(abfsConfiguration, false);
|
||||
|
||||
verifybBasicInfo(userAgentStr);
|
||||
assertThat(userAgentStr)
|
||||
Assertions.assertThat(userAgentStr)
|
||||
.describedAs("User-Agent string should not contain cluster type")
|
||||
.doesNotContain(clusterType)
|
||||
.describedAs("User-Agent string should contain UNKNOWN as cluster type config is absent")
|
||||
|
@ -311,24 +339,23 @@ public final class TestAbfsClient {
|
|||
AbfsThrottlingInterceptFactory.getInstance(
|
||||
abfsConfig.getAccountName().substring(0,
|
||||
abfsConfig.getAccountName().indexOf(DOT)), abfsConfig));
|
||||
|
||||
// override baseurl
|
||||
client = TestAbfsClient.setAbfsClientField(client, "abfsConfiguration",
|
||||
client = ITestAbfsClient.setAbfsClientField(client, "abfsConfiguration",
|
||||
abfsConfig);
|
||||
|
||||
// override baseurl
|
||||
client = TestAbfsClient.setAbfsClientField(client, "baseUrl",
|
||||
client = ITestAbfsClient.setAbfsClientField(client, "baseUrl",
|
||||
baseAbfsClientInstance.getBaseUrl());
|
||||
|
||||
// override auth provider
|
||||
if (currentAuthType == AuthType.SharedKey) {
|
||||
client = TestAbfsClient.setAbfsClientField(client, "sharedKeyCredentials",
|
||||
client = ITestAbfsClient.setAbfsClientField(client, "sharedKeyCredentials",
|
||||
new SharedKeyCredentials(
|
||||
abfsConfig.getAccountName().substring(0,
|
||||
abfsConfig.getAccountName().indexOf(DOT)),
|
||||
abfsConfig.getStorageAccountKey()));
|
||||
} else {
|
||||
client = TestAbfsClient.setAbfsClientField(client, "tokenProvider",
|
||||
client = ITestAbfsClient.setAbfsClientField(client, "tokenProvider",
|
||||
abfsConfig.getTokenProvider());
|
||||
}
|
||||
|
||||
|
@ -336,7 +363,7 @@ public final class TestAbfsClient {
|
|||
String userAgent = "APN/1.0 Azure Blob FS/3.4.0-SNAPSHOT (PrivateBuild "
|
||||
+ "JavaJRE 1.8.0_252; Linux 5.3.0-59-generic/amd64; openssl-1.0; "
|
||||
+ "UNKNOWN/UNKNOWN) MSFT";
|
||||
client = TestAbfsClient.setAbfsClientField(client, "userAgent", userAgent);
|
||||
client = ITestAbfsClient.setAbfsClientField(client, "userAgent", userAgent);
|
||||
|
||||
return client;
|
||||
}
|
||||
|
@ -404,4 +431,156 @@ public final class TestAbfsClient {
|
|||
public static AccessTokenProvider getAccessTokenProvider(AbfsClient client) {
|
||||
return client.getTokenProvider();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test helper method to get random bytes array.
|
||||
* @param length The length of byte buffer.
|
||||
* @return byte buffer.
|
||||
*/
|
||||
private byte[] getRandomBytesArray(int length) {
|
||||
final byte[] b = new byte[length];
|
||||
new Random().nextBytes(b);
|
||||
return b;
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to verify that client retries append request without
|
||||
* expect header enabled if append with expect header enabled fails
|
||||
* with 4xx kind of error.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testExpectHundredContinue() throws Exception {
|
||||
// Get the filesystem.
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
|
||||
final Configuration configuration = new Configuration();
|
||||
configuration.addResource(TEST_CONFIGURATION_FILE_NAME);
|
||||
AbfsClient abfsClient = fs.getAbfsStore().getClient();
|
||||
|
||||
AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration,
|
||||
configuration.get(FS_AZURE_ABFS_ACCOUNT_NAME));
|
||||
|
||||
// Update the configuration with reduced retry count and reduced backoff interval.
|
||||
AbfsConfiguration abfsConfig
|
||||
= TestAbfsConfigurationFieldsValidation.updateRetryConfigs(
|
||||
abfsConfiguration,
|
||||
REDUCED_RETRY_COUNT, REDUCED_BACKOFF_INTERVAL);
|
||||
|
||||
// Gets the client.
|
||||
AbfsClient testClient = Mockito.spy(
|
||||
ITestAbfsClient.createTestClientFromCurrentContext(
|
||||
abfsClient,
|
||||
abfsConfig));
|
||||
|
||||
// Create the append request params with expect header enabled initially.
|
||||
AppendRequestParameters appendRequestParameters
|
||||
= new AppendRequestParameters(
|
||||
BUFFER_OFFSET, BUFFER_OFFSET, BUFFER_LENGTH,
|
||||
AppendRequestParameters.Mode.APPEND_MODE, false, null, true);
|
||||
|
||||
byte[] buffer = getRandomBytesArray(BUFFER_LENGTH);
|
||||
|
||||
// Create a test container to upload the data.
|
||||
Path testPath = path(TEST_PATH);
|
||||
fs.create(testPath);
|
||||
String finalTestPath = testPath.toString()
|
||||
.substring(testPath.toString().lastIndexOf("/"));
|
||||
|
||||
// Creates a list of request headers.
|
||||
final List<AbfsHttpHeader> requestHeaders
|
||||
= ITestAbfsClient.getTestRequestHeaders(testClient);
|
||||
requestHeaders.add(
|
||||
new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, HTTP_METHOD_PATCH));
|
||||
if (appendRequestParameters.isExpectHeaderEnabled()) {
|
||||
requestHeaders.add(new AbfsHttpHeader(EXPECT, HUNDRED_CONTINUE));
|
||||
}
|
||||
|
||||
// Updates the query parameters.
|
||||
final AbfsUriQueryBuilder abfsUriQueryBuilder
|
||||
= testClient.createDefaultUriQueryBuilder();
|
||||
abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
|
||||
abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION,
|
||||
Long.toString(appendRequestParameters.getPosition()));
|
||||
|
||||
// Creates the url for the specified path.
|
||||
URL url = testClient.createRequestUrl(finalTestPath, abfsUriQueryBuilder.toString());
|
||||
|
||||
// Create a mock of the AbfsRestOperation to set the urlConnection in the corresponding httpOperation.
|
||||
AbfsRestOperation op = Mockito.spy(new AbfsRestOperation(
|
||||
AbfsRestOperationType.Append,
|
||||
testClient,
|
||||
HTTP_METHOD_PUT,
|
||||
url,
|
||||
requestHeaders, buffer,
|
||||
appendRequestParameters.getoffset(),
|
||||
appendRequestParameters.getLength(), null));
|
||||
|
||||
AbfsHttpOperation abfsHttpOperation = Mockito.spy(new AbfsHttpOperation(url,
|
||||
HTTP_METHOD_PUT, requestHeaders));
|
||||
|
||||
// Sets the expect request property if expect header is enabled.
|
||||
if (appendRequestParameters.isExpectHeaderEnabled()) {
|
||||
Mockito.doReturn(HUNDRED_CONTINUE).when(abfsHttpOperation)
|
||||
.getConnProperty(EXPECT);
|
||||
}
|
||||
|
||||
HttpURLConnection urlConnection = mock(HttpURLConnection.class);
|
||||
Mockito.doNothing().when(urlConnection).setRequestProperty(Mockito
|
||||
.any(), Mockito.any());
|
||||
Mockito.doReturn(HTTP_METHOD_PUT).when(urlConnection).getRequestMethod();
|
||||
Mockito.doReturn(url).when(urlConnection).getURL();
|
||||
Mockito.doReturn(urlConnection).when(abfsHttpOperation).getConnection();
|
||||
|
||||
Mockito.doNothing().when(abfsHttpOperation).setRequestProperty(Mockito
|
||||
.any(), Mockito.any());
|
||||
Mockito.doReturn(url).when(abfsHttpOperation).getConnUrl();
|
||||
|
||||
// Give user error code 404 when processResponse is called.
|
||||
Mockito.doReturn(HTTP_METHOD_PUT).when(abfsHttpOperation).getConnRequestMethod();
|
||||
Mockito.doReturn(HTTP_NOT_FOUND).when(abfsHttpOperation).getConnResponseCode();
|
||||
Mockito.doReturn("Resource Not Found")
|
||||
.when(abfsHttpOperation)
|
||||
.getConnResponseMessage();
|
||||
|
||||
// Make the getOutputStream throw IOException to see it returns from the sendRequest correctly.
|
||||
Mockito.doThrow(new ProtocolException("Server rejected Operation"))
|
||||
.when(abfsHttpOperation)
|
||||
.getConnOutputStream();
|
||||
|
||||
// Sets the httpOperation for the rest operation.
|
||||
Mockito.doReturn(abfsHttpOperation)
|
||||
.when(op)
|
||||
.createHttpOperation();
|
||||
|
||||
// Mock the restOperation for the client.
|
||||
Mockito.doReturn(op)
|
||||
.when(testClient)
|
||||
.getAbfsRestOperationForAppend(Mockito.any(),
|
||||
Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(),
|
||||
Mockito.nullable(int.class), Mockito.nullable(int.class),
|
||||
Mockito.any());
|
||||
|
||||
TracingContext tracingContext = Mockito.spy(new TracingContext("abcd",
|
||||
"abcde", FSOperationType.APPEND,
|
||||
TracingHeaderFormat.ALL_ID_FORMAT, null));
|
||||
|
||||
// Check that expect header is enabled before the append call.
|
||||
Assertions.assertThat(appendRequestParameters.isExpectHeaderEnabled())
|
||||
.describedAs("The expect header is not true before the append call")
|
||||
.isTrue();
|
||||
|
||||
intercept(AzureBlobFileSystemException.class,
|
||||
() -> testClient.append(finalTestPath, buffer, appendRequestParameters, null, tracingContext));
|
||||
|
||||
// Verify that the request was not exponentially retried because of user error.
|
||||
Assertions.assertThat(tracingContext.getRetryCount())
|
||||
.describedAs("The retry count is incorrect")
|
||||
.isEqualTo(0);
|
||||
|
||||
// Verify that the same request was retried with expect header disabled.
|
||||
Assertions.assertThat(appendRequestParameters.isExpectHeaderEnabled())
|
||||
.describedAs("The expect header is not false")
|
||||
.isFalse();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,358 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.ProtocolException;
|
||||
import java.net.URL;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
||||
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
|
||||
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
|
||||
import org.apache.hadoop.fs.azurebfs.TestAbfsConfigurationFieldsValidation;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
|
||||
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
|
||||
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
|
||||
|
||||
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
|
||||
import static java.net.HttpURLConnection.HTTP_OK;
|
||||
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_ACTION;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_POSITION;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ABFS_ACCOUNT_NAME;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME;
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.times;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class ITestAbfsRestOperation extends AbstractAbfsIntegrationTest {
|
||||
|
||||
// Specifies whether getOutputStream() or write() throws IOException.
|
||||
public enum ErrorType {OUTPUTSTREAM, WRITE};
|
||||
|
||||
private static final int HTTP_EXPECTATION_FAILED = 417;
|
||||
private static final int HTTP_ERROR = 0;
|
||||
private static final int ZERO = 0;
|
||||
private static final int REDUCED_RETRY_COUNT = 2;
|
||||
private static final int REDUCED_BACKOFF_INTERVAL = 100;
|
||||
private static final int BUFFER_LENGTH = 5;
|
||||
private static final int BUFFER_OFFSET = 0;
|
||||
private static final String TEST_PATH = "/testfile";
|
||||
|
||||
// Specifies whether the expect header is enabled or not.
|
||||
@Parameterized.Parameter
|
||||
public boolean expectHeaderEnabled;
|
||||
|
||||
// Gives the http response code.
|
||||
@Parameterized.Parameter(1)
|
||||
public int responseCode;
|
||||
|
||||
// Gives the http response message.
|
||||
@Parameterized.Parameter(2)
|
||||
public String responseMessage;
|
||||
|
||||
// Gives the errorType based on the enum.
|
||||
@Parameterized.Parameter(3)
|
||||
public ErrorType errorType;
|
||||
|
||||
// The intercept.
|
||||
private AbfsThrottlingIntercept intercept;
|
||||
|
||||
/*
|
||||
HTTP_OK = 200,
|
||||
HTTP_UNAVAILABLE = 503,
|
||||
HTTP_NOT_FOUND = 404,
|
||||
HTTP_EXPECTATION_FAILED = 417,
|
||||
HTTP_ERROR = 0.
|
||||
*/
|
||||
@Parameterized.Parameters(name = "expect={0}-code={1}-ErrorType={3}")
|
||||
public static Iterable<Object[]> params() {
|
||||
return Arrays.asList(new Object[][]{
|
||||
{true, HTTP_OK, "OK", ErrorType.WRITE},
|
||||
{false, HTTP_OK, "OK", ErrorType.WRITE},
|
||||
{true, HTTP_UNAVAILABLE, "ServerBusy", ErrorType.OUTPUTSTREAM},
|
||||
{true, HTTP_NOT_FOUND, "Resource Not Found", ErrorType.OUTPUTSTREAM},
|
||||
{true, HTTP_EXPECTATION_FAILED, "Expectation Failed", ErrorType.OUTPUTSTREAM},
|
||||
{true, HTTP_ERROR, "Error", ErrorType.OUTPUTSTREAM}
|
||||
});
|
||||
}
|
||||
|
||||
public ITestAbfsRestOperation() throws Exception {
|
||||
super();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test helper method to get random bytes array.
|
||||
* @param length The length of byte buffer
|
||||
* @return byte buffer
|
||||
*/
|
||||
private byte[] getRandomBytesArray(int length) {
|
||||
final byte[] b = new byte[length];
|
||||
new Random().nextBytes(b);
|
||||
return b;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gives the AbfsRestOperation.
|
||||
* @return abfsRestOperation.
|
||||
*/
|
||||
private AbfsRestOperation getRestOperation() throws Exception {
|
||||
// Get the filesystem.
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
|
||||
final Configuration configuration = new Configuration();
|
||||
configuration.addResource(TEST_CONFIGURATION_FILE_NAME);
|
||||
AbfsClient abfsClient = fs.getAbfsStore().getClient();
|
||||
|
||||
AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration,
|
||||
configuration.get(FS_AZURE_ABFS_ACCOUNT_NAME));
|
||||
|
||||
// Update the configuration with reduced retry count and reduced backoff interval.
|
||||
AbfsConfiguration abfsConfig
|
||||
= TestAbfsConfigurationFieldsValidation.updateRetryConfigs(
|
||||
abfsConfiguration,
|
||||
REDUCED_RETRY_COUNT, REDUCED_BACKOFF_INTERVAL);
|
||||
|
||||
intercept = Mockito.mock(AbfsThrottlingIntercept.class);
|
||||
Mockito.doNothing().when(intercept).updateMetrics(Mockito.any(), Mockito.any());
|
||||
|
||||
// Gets the client.
|
||||
AbfsClient testClient = Mockito.spy(ITestAbfsClient.createTestClientFromCurrentContext(
|
||||
abfsClient,
|
||||
abfsConfig));
|
||||
|
||||
Mockito.doReturn(intercept).when(testClient).getIntercept();
|
||||
|
||||
// Expect header is enabled or not based on the parameter.
|
||||
AppendRequestParameters appendRequestParameters
|
||||
= new AppendRequestParameters(
|
||||
BUFFER_OFFSET, BUFFER_OFFSET, BUFFER_LENGTH,
|
||||
AppendRequestParameters.Mode.APPEND_MODE, false, null,
|
||||
expectHeaderEnabled);
|
||||
|
||||
byte[] buffer = getRandomBytesArray(5);
|
||||
|
||||
// Create a test container to upload the data.
|
||||
Path testPath = path(TEST_PATH);
|
||||
fs.create(testPath);
|
||||
String finalTestPath = testPath.toString().substring(testPath.toString().lastIndexOf("/"));
|
||||
|
||||
// Creates a list of request headers.
|
||||
final List<AbfsHttpHeader> requestHeaders = ITestAbfsClient.getTestRequestHeaders(testClient);
|
||||
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, HTTP_METHOD_PATCH));
|
||||
if (appendRequestParameters.isExpectHeaderEnabled()) {
|
||||
requestHeaders.add(new AbfsHttpHeader(EXPECT, HUNDRED_CONTINUE));
|
||||
}
|
||||
|
||||
// Updates the query parameters.
|
||||
final AbfsUriQueryBuilder abfsUriQueryBuilder = testClient.createDefaultUriQueryBuilder();
|
||||
abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
|
||||
abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(appendRequestParameters.getPosition()));
|
||||
|
||||
// Creates the url for the specified path.
|
||||
URL url = testClient.createRequestUrl(finalTestPath, abfsUriQueryBuilder.toString());
|
||||
|
||||
// Create a mock of the AbfsRestOperation to set the urlConnection in the corresponding httpOperation.
|
||||
AbfsRestOperation op = Mockito.spy(new AbfsRestOperation(
|
||||
AbfsRestOperationType.Append,
|
||||
testClient,
|
||||
HTTP_METHOD_PUT,
|
||||
url,
|
||||
requestHeaders, buffer,
|
||||
appendRequestParameters.getoffset(),
|
||||
appendRequestParameters.getLength(), null));
|
||||
|
||||
AbfsHttpOperation abfsHttpOperation = Mockito.spy(new AbfsHttpOperation(url, HTTP_METHOD_PUT, requestHeaders));
|
||||
|
||||
// Sets the expect request property if expect header is enabled.
|
||||
if (expectHeaderEnabled) {
|
||||
Mockito.doReturn(HUNDRED_CONTINUE)
|
||||
.when(abfsHttpOperation)
|
||||
.getConnProperty(EXPECT);
|
||||
}
|
||||
|
||||
HttpURLConnection urlConnection = mock(HttpURLConnection.class);
|
||||
Mockito.doNothing().when(urlConnection).setRequestProperty(Mockito
|
||||
.any(), Mockito.any());
|
||||
Mockito.doReturn(HTTP_METHOD_PUT).when(urlConnection).getRequestMethod();
|
||||
Mockito.doReturn(url).when(urlConnection).getURL();
|
||||
Mockito.doReturn(urlConnection).when(abfsHttpOperation).getConnection();
|
||||
|
||||
Mockito.doNothing().when(abfsHttpOperation).setRequestProperty(Mockito
|
||||
.any(), Mockito.any());
|
||||
Mockito.doReturn(url).when(abfsHttpOperation).getConnUrl();
|
||||
Mockito.doReturn(HTTP_METHOD_PUT).when(abfsHttpOperation).getConnRequestMethod();
|
||||
|
||||
switch (errorType) {
|
||||
case OUTPUTSTREAM:
|
||||
// If the getOutputStream() throws IOException and Expect Header is
|
||||
// enabled, it returns back to processResponse and hence we have
|
||||
// mocked the response code and the response message to check different
|
||||
// behaviour based on response code.
|
||||
Mockito.doReturn(responseCode).when(abfsHttpOperation).getConnResponseCode();
|
||||
Mockito.doReturn(responseMessage)
|
||||
.when(abfsHttpOperation)
|
||||
.getConnResponseMessage();
|
||||
Mockito.doThrow(new ProtocolException("Server rejected Operation"))
|
||||
.when(abfsHttpOperation)
|
||||
.getConnOutputStream();
|
||||
break;
|
||||
case WRITE:
|
||||
// If write() throws IOException and Expect Header is
|
||||
// enabled or not, it should throw back the exception.
|
||||
OutputStream outputStream = Mockito.spy(new OutputStream() {
|
||||
@Override
|
||||
public void write(final int i) throws IOException {
|
||||
}
|
||||
});
|
||||
Mockito.doReturn(outputStream).when(abfsHttpOperation).getConnOutputStream();
|
||||
Mockito.doThrow(new IOException())
|
||||
.when(outputStream)
|
||||
.write(buffer, appendRequestParameters.getoffset(),
|
||||
appendRequestParameters.getLength());
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
// Sets the httpOperation for the rest operation.
|
||||
Mockito.doReturn(abfsHttpOperation)
|
||||
.when(op)
|
||||
.createHttpOperation();
|
||||
return op;
|
||||
}
|
||||
|
||||
void assertTraceContextState(int retryCount, int assertRetryCount, int bytesSent, int assertBytesSent,
|
||||
int expectedBytesSent, int assertExpectedBytesSent) {
|
||||
// Assert that the request is retried or not.
|
||||
Assertions.assertThat(retryCount)
|
||||
.describedAs("The retry count is incorrect")
|
||||
.isEqualTo(assertRetryCount);
|
||||
|
||||
// Assert that metrics will be updated correctly.
|
||||
Assertions.assertThat(bytesSent)
|
||||
.describedAs("The bytes sent is incorrect")
|
||||
.isEqualTo(assertBytesSent);
|
||||
Assertions.assertThat(expectedBytesSent)
|
||||
.describedAs("The expected bytes sent is incorrect")
|
||||
.isEqualTo(assertExpectedBytesSent);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test the functionalities based on whether getOutputStream() or write()
|
||||
* throws exception and what is the corresponding response code.
|
||||
*/
|
||||
@Test
|
||||
public void testExpectHundredContinue() throws Exception {
|
||||
// Gets the AbfsRestOperation.
|
||||
AbfsRestOperation op = getRestOperation();
|
||||
AbfsHttpOperation httpOperation = op.createHttpOperation();
|
||||
|
||||
TracingContext tracingContext = Mockito.spy(new TracingContext("abcd",
|
||||
"abcde", FSOperationType.APPEND,
|
||||
TracingHeaderFormat.ALL_ID_FORMAT, null));
|
||||
|
||||
switch (errorType) {
|
||||
case WRITE:
|
||||
// If write() throws IOException and Expect Header is
|
||||
// enabled or not, it should throw back the exception
|
||||
// which is caught and exponential retry logic comes into place.
|
||||
intercept(IOException.class,
|
||||
() -> op.execute(tracingContext));
|
||||
|
||||
// Asserting update of metrics and retries.
|
||||
assertTraceContextState(tracingContext.getRetryCount(), REDUCED_RETRY_COUNT, httpOperation.getBytesSent(), BUFFER_LENGTH,
|
||||
0, 0);
|
||||
break;
|
||||
case OUTPUTSTREAM:
|
||||
switch (responseCode) {
|
||||
case HTTP_UNAVAILABLE:
|
||||
// In the case of 503 i.e. throttled case, we should retry.
|
||||
intercept(IOException.class,
|
||||
() -> op.execute(tracingContext));
|
||||
|
||||
// Asserting update of metrics and retries.
|
||||
assertTraceContextState(tracingContext.getRetryCount(), REDUCED_RETRY_COUNT, httpOperation.getBytesSent(), ZERO,
|
||||
httpOperation.getExpectedBytesToBeSent(), BUFFER_LENGTH);
|
||||
|
||||
// Verifies that update Metrics call is made for throttle case and for the first without retry +
|
||||
// for the retried cases as well.
|
||||
Mockito.verify(intercept, times(REDUCED_RETRY_COUNT + 1))
|
||||
.updateMetrics(Mockito.any(), Mockito.any());
|
||||
break;
|
||||
case HTTP_ERROR:
|
||||
// In the case of http status code 0 i.e. ErrorType case, we should retry.
|
||||
intercept(IOException.class,
|
||||
() -> op.execute(tracingContext));
|
||||
|
||||
// Asserting update of metrics and retries.
|
||||
assertTraceContextState(tracingContext.getRetryCount(), REDUCED_RETRY_COUNT, httpOperation.getBytesSent(),
|
||||
ZERO, 0, 0);
|
||||
|
||||
// Verifies that update Metrics call is made for ErrorType case and for the first without retry +
|
||||
// for the retried cases as well.
|
||||
Mockito.verify(intercept, times(REDUCED_RETRY_COUNT + 1))
|
||||
.updateMetrics(Mockito.any(), Mockito.any());
|
||||
break;
|
||||
case HTTP_NOT_FOUND:
|
||||
case HTTP_EXPECTATION_FAILED:
|
||||
// In the case of 4xx ErrorType. i.e. user ErrorType, retry should not happen.
|
||||
intercept(AzureBlobFileSystemException.class,
|
||||
() -> op.execute(tracingContext));
|
||||
|
||||
// Asserting update of metrics and retries.
|
||||
assertTraceContextState(tracingContext.getRetryCount(), ZERO, 0,
|
||||
0, 0, 0);
|
||||
|
||||
// Verifies that update Metrics call is not made for user ErrorType case.
|
||||
Mockito.verify(intercept, never())
|
||||
.updateMetrics(Mockito.any(), Mockito.any());
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -72,6 +72,7 @@ public final class TestAbfsOutputStream {
|
|||
boolean isFlushEnabled,
|
||||
boolean disableOutputStreamFlush,
|
||||
boolean isAppendBlob,
|
||||
boolean isExpectHeaderEnabled,
|
||||
AbfsClient client,
|
||||
String path,
|
||||
TracingContext tracingContext,
|
||||
|
@ -89,6 +90,7 @@ public final class TestAbfsOutputStream {
|
|||
|
||||
return new AbfsOutputStreamContext(2)
|
||||
.withWriteBufferSize(writeBufferSize)
|
||||
.enableExpectHeader(isExpectHeaderEnabled)
|
||||
.enableFlush(isFlushEnabled)
|
||||
.disableOutputStreamFlush(disableOutputStreamFlush)
|
||||
.withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
|
||||
|
@ -129,6 +131,7 @@ public final class TestAbfsOutputStream {
|
|||
true,
|
||||
false,
|
||||
false,
|
||||
true,
|
||||
client,
|
||||
PATH,
|
||||
new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
|
||||
|
@ -149,9 +152,9 @@ public final class TestAbfsOutputStream {
|
|||
out.hsync();
|
||||
|
||||
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
|
||||
0, 0, WRITE_SIZE, APPEND_MODE, false, null);
|
||||
0, 0, WRITE_SIZE, APPEND_MODE, false, null, true);
|
||||
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
|
||||
WRITE_SIZE, 0, 2 * WRITE_SIZE, APPEND_MODE, false, null);
|
||||
WRITE_SIZE, 0, 2 * WRITE_SIZE, APPEND_MODE, false, null, true);
|
||||
|
||||
verify(client, times(1)).append(
|
||||
eq(PATH), any(byte[].class), refEq(firstReqParameters), any(),
|
||||
|
@ -190,6 +193,7 @@ public final class TestAbfsOutputStream {
|
|||
true,
|
||||
false,
|
||||
false,
|
||||
true,
|
||||
client,
|
||||
PATH,
|
||||
tracingContext,
|
||||
|
@ -203,9 +207,9 @@ public final class TestAbfsOutputStream {
|
|||
out.close();
|
||||
|
||||
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
|
||||
0, 0, BUFFER_SIZE, APPEND_MODE, false, null);
|
||||
0, 0, BUFFER_SIZE, APPEND_MODE, false, null, true);
|
||||
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
|
||||
BUFFER_SIZE, 0, 5*WRITE_SIZE-BUFFER_SIZE, APPEND_MODE, false, null);
|
||||
BUFFER_SIZE, 0, 5*WRITE_SIZE-BUFFER_SIZE, APPEND_MODE, false, null, true);
|
||||
|
||||
verify(client, times(1)).append(
|
||||
eq(PATH), any(byte[].class), refEq(firstReqParameters), any(),
|
||||
|
@ -264,6 +268,7 @@ public final class TestAbfsOutputStream {
|
|||
true,
|
||||
false,
|
||||
false,
|
||||
true,
|
||||
client,
|
||||
PATH,
|
||||
tracingContext,
|
||||
|
@ -277,9 +282,9 @@ public final class TestAbfsOutputStream {
|
|||
out.close();
|
||||
|
||||
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
|
||||
0, 0, BUFFER_SIZE, APPEND_MODE, false, null);
|
||||
0, 0, BUFFER_SIZE, APPEND_MODE, false, null, true);
|
||||
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
|
||||
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null);
|
||||
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null, true);
|
||||
|
||||
verify(client, times(1)).append(
|
||||
eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), any(TracingContext.class));
|
||||
|
@ -335,6 +340,7 @@ public final class TestAbfsOutputStream {
|
|||
true,
|
||||
false,
|
||||
false,
|
||||
true,
|
||||
client,
|
||||
PATH,
|
||||
new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
|
||||
|
@ -350,9 +356,9 @@ public final class TestAbfsOutputStream {
|
|||
Thread.sleep(1000);
|
||||
|
||||
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
|
||||
0, 0, BUFFER_SIZE, APPEND_MODE, false, null);
|
||||
0, 0, BUFFER_SIZE, APPEND_MODE, false, null, true);
|
||||
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
|
||||
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null);
|
||||
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null, true);
|
||||
|
||||
verify(client, times(1)).append(
|
||||
eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), any(TracingContext.class));
|
||||
|
@ -390,6 +396,7 @@ public final class TestAbfsOutputStream {
|
|||
true,
|
||||
false,
|
||||
true,
|
||||
true,
|
||||
client,
|
||||
PATH,
|
||||
new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
|
||||
|
@ -405,9 +412,9 @@ public final class TestAbfsOutputStream {
|
|||
Thread.sleep(1000);
|
||||
|
||||
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
|
||||
0, 0, BUFFER_SIZE, APPEND_MODE, true, null);
|
||||
0, 0, BUFFER_SIZE, APPEND_MODE, true, null, true);
|
||||
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
|
||||
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, true, null);
|
||||
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, true, null, true);
|
||||
|
||||
verify(client, times(1)).append(
|
||||
eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), any(TracingContext.class));
|
||||
|
@ -449,6 +456,7 @@ public final class TestAbfsOutputStream {
|
|||
true,
|
||||
false,
|
||||
false,
|
||||
true,
|
||||
client,
|
||||
PATH,
|
||||
new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
|
||||
|
@ -464,9 +472,9 @@ public final class TestAbfsOutputStream {
|
|||
out.hflush();
|
||||
|
||||
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
|
||||
0, 0, BUFFER_SIZE, APPEND_MODE, false, null);
|
||||
0, 0, BUFFER_SIZE, APPEND_MODE, false, null, true);
|
||||
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
|
||||
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null);
|
||||
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null, true);
|
||||
|
||||
verify(client, times(1)).append(
|
||||
eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), any(TracingContext.class));
|
||||
|
@ -518,6 +526,7 @@ public final class TestAbfsOutputStream {
|
|||
true,
|
||||
false,
|
||||
false,
|
||||
true,
|
||||
client,
|
||||
PATH,
|
||||
new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
|
||||
|
@ -535,9 +544,9 @@ public final class TestAbfsOutputStream {
|
|||
Thread.sleep(1000);
|
||||
|
||||
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
|
||||
0, 0, BUFFER_SIZE, APPEND_MODE, false, null);
|
||||
0, 0, BUFFER_SIZE, APPEND_MODE, false, null, true);
|
||||
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
|
||||
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null);
|
||||
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null, true);
|
||||
|
||||
verify(client, times(1)).append(
|
||||
eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), any(TracingContext.class));
|
||||
|
|
|
@ -58,7 +58,7 @@ public class TestAbfsRenameRetryRecovery extends AbstractAbfsIntegrationTest {
|
|||
String destNoParentPath = "/NoParent/Dest";
|
||||
AzureBlobFileSystem fs = getFileSystem();
|
||||
|
||||
AbfsClient mockClient = TestAbfsClient.getMockAbfsClient(
|
||||
AbfsClient mockClient = ITestAbfsClient.getMockAbfsClient(
|
||||
fs.getAbfsStore().getClient(),
|
||||
fs.getAbfsStore().getAbfsConfiguration());
|
||||
|
||||
|
|
|
@ -103,7 +103,7 @@ public class TestExponentialRetryPolicy extends AbstractAbfsIntegrationTest {
|
|||
AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration,
|
||||
"dummy.dfs.core.windows.net");
|
||||
AbfsThrottlingIntercept intercept;
|
||||
AbfsClient abfsClient = TestAbfsClient.createTestClientFromCurrentContext(fs.getAbfsStore().getClient(), abfsConfiguration);
|
||||
AbfsClient abfsClient = ITestAbfsClient.createTestClientFromCurrentContext(fs.getAbfsStore().getClient(), abfsConfiguration);
|
||||
intercept = abfsClient.getIntercept();
|
||||
Assertions.assertThat(intercept)
|
||||
.describedAs("AbfsNoOpThrottlingIntercept instance expected")
|
||||
|
@ -114,7 +114,7 @@ public class TestExponentialRetryPolicy extends AbstractAbfsIntegrationTest {
|
|||
// On disabling throttling AbfsClientThrottlingIntercept object is returned
|
||||
AbfsConfiguration abfsConfiguration1 = new AbfsConfiguration(configuration,
|
||||
"dummy1.dfs.core.windows.net");
|
||||
AbfsClient abfsClient1 = TestAbfsClient.createTestClientFromCurrentContext(fs.getAbfsStore().getClient(), abfsConfiguration1);
|
||||
AbfsClient abfsClient1 = ITestAbfsClient.createTestClientFromCurrentContext(fs.getAbfsStore().getClient(), abfsConfiguration1);
|
||||
intercept = abfsClient1.getIntercept();
|
||||
Assertions.assertThat(intercept)
|
||||
.describedAs("AbfsClientThrottlingIntercept instance expected")
|
||||
|
|
Loading…
Reference in New Issue