diff --git a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml index fd2a7c210e7..2065746b766 100644 --- a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml +++ b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml @@ -48,7 +48,11 @@ files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]utils[\\/]Base64.java"/> + + diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 5a31b10b514..69298079a57 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -117,6 +117,11 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_OPTIMIZE_FOOTER_READ) private boolean optimizeFooterRead; + @BooleanConfigurationValidatorAnnotation( + ConfigurationKey = FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED, + DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED) + private boolean isExpectHeaderEnabled; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED, DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED) private boolean accountThrottlingEnabled; @@ -706,6 +711,10 @@ public class AbfsConfiguration{ return this.azureAppendBlobDirs; } + public boolean isExpectHeaderEnabled() { + return this.isExpectHeaderEnabled; + } + public boolean accountThrottlingEnabled() { return accountThrottlingEnabled; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 3941f6e421a..70ba9cd579a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -693,6 +693,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { } return new AbfsOutputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds()) .withWriteBufferSize(bufferSize) + .enableExpectHeader(abfsConfiguration.isExpectHeaderEnabled()) .enableFlush(abfsConfiguration.isFlushEnabled()) .enableSmallWriteOptimization(abfsConfiguration.isSmallWriteOptimizationEnabled()) .disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled()) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java index e1b791f6ef2..7e4ddfa675a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java @@ -64,6 +64,11 @@ public final class AbfsHttpConstants { public static final String HTTP_METHOD_PATCH = "PATCH"; public static final String HTTP_METHOD_POST = "POST"; public static final String HTTP_METHOD_PUT = "PUT"; + /** + * All status codes less than http 100 signify error + * and should qualify for retry. + */ + public static final int HTTP_CONTINUE = 100; // Abfs generic constants public static final String SINGLE_WHITE_SPACE = " "; @@ -103,6 +108,9 @@ public final class AbfsHttpConstants { public static final String DEFAULT_SCOPE = "default:"; public static final String PERMISSION_FORMAT = "%04d"; public static final String SUPER_USER = "$superuser"; + // The HTTP 100 Continue informational status response code indicates that everything so far + // is OK and that the client should continue with the request or ignore it if it is already finished. + public static final String HUNDRED_CONTINUE = "100-continue"; public static final char CHAR_FORWARD_SLASH = '/'; public static final char CHAR_EXCLAMATION_POINT = '!'; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index a59f76b6d0f..e3052cd7bbc 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -35,6 +35,11 @@ public final class ConfigurationKeys { * path to determine HNS status. */ public static final String FS_AZURE_ACCOUNT_IS_HNS_ENABLED = "fs.azure.account.hns.enabled"; + /** + * Enable or disable expect hundred continue header. + * Value: {@value}. + */ + public static final String FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = "fs.azure.account.expect.header.enabled"; public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME = "fs.azure.account.key"; public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX = "fs\\.azure\\.account\\.key\\.(.*)"; public static final String FS_AZURE_SECURE_MODE = "fs.azure.secure.mode"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 9994d9f5207..68b492a5791 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -32,7 +32,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_ST public final class FileSystemConfigurations { public static final String DEFAULT_FS_AZURE_ACCOUNT_IS_HNS_ENABLED = ""; - + public static final boolean DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = true; public static final String USER_HOME_DIRECTORY_PREFIX = "/user"; private static final int SIXTY_SECONDS = 60 * 1000; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java index d4065ac2836..b123e90170e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java @@ -70,6 +70,7 @@ public final class HttpHeaderConfigurations { public static final String X_MS_LEASE_ID = "x-ms-lease-id"; public static final String X_MS_PROPOSED_LEASE_ID = "x-ms-proposed-lease-id"; public static final String X_MS_LEASE_BREAK_PERIOD = "x-ms-lease-break-period"; + public static final String EXPECT = "Expect"; private HttpHeaderConfigurations() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/InvalidAbfsRestOperationException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/InvalidAbfsRestOperationException.java index 19620212134..285297024c7 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/InvalidAbfsRestOperationException.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/InvalidAbfsRestOperationException.java @@ -30,6 +30,9 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; @InterfaceAudience.Public @InterfaceStability.Evolving public class InvalidAbfsRestOperationException extends AbfsRestOperationException { + + private static final String ERROR_MESSAGE = "InvalidAbfsRestOperationException"; + public InvalidAbfsRestOperationException( final Exception innerException) { super( @@ -37,7 +40,23 @@ public class InvalidAbfsRestOperationException extends AbfsRestOperationExceptio AzureServiceErrorCode.UNKNOWN.getErrorCode(), innerException != null ? innerException.toString() - : "InvalidAbfsRestOperationException", + : ERROR_MESSAGE, innerException); } + + /** + * Adds the retry count along with the exception. + * @param innerException The inner exception which is originally caught. + * @param retryCount The retry count when the exception was thrown. + */ + public InvalidAbfsRestOperationException( + final Exception innerException, int retryCount) { + super( + AzureServiceErrorCode.UNKNOWN.getStatusCode(), + AzureServiceErrorCode.UNKNOWN.getErrorCode(), + innerException != null + ? innerException.toString() + : ERROR_MESSAGE + " RetryCount: " + retryCount, + innerException); + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java index 7369bfaf564..57e559a60ec 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java @@ -34,19 +34,22 @@ public class AppendRequestParameters { private final Mode mode; private final boolean isAppendBlob; private final String leaseId; + private boolean isExpectHeaderEnabled; public AppendRequestParameters(final long position, final int offset, final int length, final Mode mode, final boolean isAppendBlob, - final String leaseId) { + final String leaseId, + final boolean isExpectHeaderEnabled) { this.position = position; this.offset = offset; this.length = length; this.mode = mode; this.isAppendBlob = isAppendBlob; this.leaseId = leaseId; + this.isExpectHeaderEnabled = isExpectHeaderEnabled; } public long getPosition() { @@ -72,4 +75,12 @@ public class AppendRequestParameters { public String getLeaseId() { return this.leaseId; } + + public boolean isExpectHeaderEnabled() { + return isExpectHeaderEnabled; + } + + public void setExpectHeaderEnabled(boolean expectHeaderEnabled) { + isExpectHeaderEnabled = expectHeaderEnabled; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 9974979aeba..2e497b54975 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -77,6 +77,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.S import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*; import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND; /** @@ -656,6 +657,9 @@ public class AbfsClient implements Closeable { throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); addCustomerProvidedKeyHeaders(requestHeaders); + if (reqParams.isExpectHeaderEnabled()) { + requestHeaders.add(new AbfsHttpHeader(EXPECT, HUNDRED_CONTINUE)); + } // JDK7 does not support PATCH, so to workaround the issue we will use // PUT and specify the real method in the X-Http-Method-Override header. requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, @@ -681,29 +685,7 @@ public class AbfsClient implements Closeable { abfsUriQueryBuilder, cachedSasToken); final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); - final AbfsRestOperation op = new AbfsRestOperation( - AbfsRestOperationType.Append, - this, - HTTP_METHOD_PUT, - url, - requestHeaders, - buffer, - reqParams.getoffset(), - reqParams.getLength(), - sasTokenForReuse); - try { - op.execute(tracingContext); - } catch (AzureBlobFileSystemException e) { - // If we have no HTTP response, throw the original exception. - if (!op.hasResult()) { - throw e; - } - if (reqParams.isAppendBlob() - && appendSuccessCheckOp(op, path, - (reqParams.getPosition() + reqParams.getLength()), tracingContext)) { - final AbfsRestOperation successOp = new AbfsRestOperation( - AbfsRestOperationType.Append, - this, + final AbfsRestOperation op = getAbfsRestOperationForAppend(AbfsRestOperationType.Append, HTTP_METHOD_PUT, url, requestHeaders, @@ -711,6 +693,41 @@ public class AbfsClient implements Closeable { reqParams.getoffset(), reqParams.getLength(), sasTokenForReuse); + try { + op.execute(tracingContext); + } catch (AzureBlobFileSystemException e) { + /* + If the http response code indicates a user error we retry + the same append request with expect header being disabled. + When "100-continue" header is enabled but a non Http 100 response comes, + the response message might not get set correctly by the server. + So, this handling is to avoid breaking of backward compatibility + if someone has taken dependency on the exception message, + which is created using the error string present in the response header. + */ + int responseStatusCode = ((AbfsRestOperationException) e).getStatusCode(); + if (checkUserError(responseStatusCode) && reqParams.isExpectHeaderEnabled()) { + LOG.debug("User error, retrying without 100 continue enabled for the given path {}", path); + reqParams.setExpectHeaderEnabled(false); + return this.append(path, buffer, reqParams, cachedSasToken, + tracingContext); + } + // If we have no HTTP response, throw the original exception. + if (!op.hasResult()) { + throw e; + } + if (reqParams.isAppendBlob() + && appendSuccessCheckOp(op, path, + (reqParams.getPosition() + reqParams.getLength()), tracingContext)) { + final AbfsRestOperation successOp = getAbfsRestOperationForAppend( + AbfsRestOperationType.Append, + HTTP_METHOD_PUT, + url, + requestHeaders, + buffer, + reqParams.getoffset(), + reqParams.getLength(), + sasTokenForReuse); successOp.hardSetResult(HttpURLConnection.HTTP_OK); return successOp; } @@ -720,6 +737,48 @@ public class AbfsClient implements Closeable { return op; } + /** + * Returns the rest operation for append. + * @param operationType The AbfsRestOperationType. + * @param httpMethod specifies the httpMethod. + * @param url specifies the url. + * @param requestHeaders This includes the list of request headers. + * @param buffer The buffer to write into. + * @param bufferOffset The buffer offset. + * @param bufferLength The buffer Length. + * @param sasTokenForReuse The sasToken. + * @return AbfsRestOperation op. + */ + @VisibleForTesting + AbfsRestOperation getAbfsRestOperationForAppend(final AbfsRestOperationType operationType, + final String httpMethod, + final URL url, + final List requestHeaders, + final byte[] buffer, + final int bufferOffset, + final int bufferLength, + final String sasTokenForReuse) { + return new AbfsRestOperation( + operationType, + this, + httpMethod, + url, + requestHeaders, + buffer, + bufferOffset, + bufferLength, sasTokenForReuse); + } + + /** + * Returns true if the status code lies in the range of user error. + * @param responseStatusCode http response status code. + * @return True or False. + */ + private boolean checkUserError(int responseStatusCode) { + return (responseStatusCode >= HttpURLConnection.HTTP_BAD_REQUEST + && responseStatusCode < HttpURLConnection.HTTP_INTERNAL_ERROR); + } + // For AppendBlob its possible that the append succeeded in the backend but the request failed. // However a retry would fail with an InvalidQueryParameterValue // (as the current offset would be unacceptable). diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java index 52a46bc7469..3bb225d4be8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java @@ -28,6 +28,8 @@ import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.fs.azurebfs.AbfsStatistic; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; +import static java.net.HttpURLConnection.HTTP_UNAVAILABLE; + /** * Throttles Azure Blob File System read and write operations to achieve maximum * throughput by minimizing errors. The errors occur when the account ingress @@ -60,7 +62,7 @@ public final class AbfsClientThrottlingIntercept implements AbfsThrottlingInterc // Hide default constructor private AbfsClientThrottlingIntercept(AbfsConfiguration abfsConfiguration) { - //Account name is kept as empty as same instance is shared across all accounts + // Account name is kept as empty as same instance is shared across all accounts. this.accountName = ""; this.readThrottler = setAnalyzer("read", abfsConfiguration); this.writeThrottler = setAnalyzer("write", abfsConfiguration); @@ -114,6 +116,18 @@ public final class AbfsClientThrottlingIntercept implements AbfsThrottlingInterc return singleton; } + /** + * Updates the metrics for the case when response code signifies throttling + * but there are some expected bytes to be sent. + * @param isThrottledOperation returns true if status code is HTTP_UNAVAILABLE + * @param abfsHttpOperation Used for status code and data transferred. + * @return true if the operation is throttled and has some bytes to transfer. + */ + private boolean updateBytesTransferred(boolean isThrottledOperation, + AbfsHttpOperation abfsHttpOperation) { + return isThrottledOperation && abfsHttpOperation.getExpectedBytesToBeSent() > 0; + } + /** * Updates the metrics for successful and failed read and write operations. * @param operationType Only applicable for read and write operations. @@ -134,9 +148,22 @@ public final class AbfsClientThrottlingIntercept implements AbfsThrottlingInterc boolean isFailedOperation = (status < HttpURLConnection.HTTP_OK || status >= HttpURLConnection.HTTP_INTERNAL_ERROR); + // If status code is 503, it is considered as a throttled operation. + boolean isThrottledOperation = (status == HTTP_UNAVAILABLE); + switch (operationType) { case Append: contentLength = abfsHttpOperation.getBytesSent(); + if (contentLength == 0) { + /* + Signifies the case where we could not update the bytesSent due to + throttling but there were some expectedBytesToBeSent. + */ + if (updateBytesTransferred(isThrottledOperation, abfsHttpOperation)) { + LOG.debug("Updating metrics due to throttling for path {}", abfsHttpOperation.getConnUrl().getPath()); + contentLength = abfsHttpOperation.getExpectedBytesToBeSent(); + } + } if (contentLength > 0) { writeThrottler.addBytesTransferred(contentLength, isFailedOperation); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java index 6f7ee2e1622..a47720ab697 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java @@ -42,6 +42,9 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable; import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT; + /** * Represents an HTTP operation. */ @@ -72,6 +75,7 @@ public class AbfsHttpOperation implements AbfsPerfLoggable { // metrics private int bytesSent; + private int expectedBytesToBeSent; private long bytesReceived; // optional trace enabled metrics @@ -154,6 +158,10 @@ public class AbfsHttpOperation implements AbfsPerfLoggable { return bytesSent; } + public int getExpectedBytesToBeSent() { + return expectedBytesToBeSent; + } + public long getBytesReceived() { return bytesReceived; } @@ -281,7 +289,7 @@ public class AbfsHttpOperation implements AbfsPerfLoggable { this.connection.setRequestMethod(method); for (AbfsHttpHeader header : requestHeaders) { - this.connection.setRequestProperty(header.getName(), header.getValue()); + setRequestProperty(header.getName(), header.getValue()); } } @@ -313,13 +321,44 @@ public class AbfsHttpOperation implements AbfsPerfLoggable { if (this.isTraceEnabled) { startTime = System.nanoTime(); } - try (OutputStream outputStream = this.connection.getOutputStream()) { - // update bytes sent before they are sent so we may observe - // attempted sends as well as successful sends via the - // accompanying statusCode + OutputStream outputStream = null; + // Updates the expected bytes to be sent based on length. + this.expectedBytesToBeSent = length; + try { + try { + /* Without expect header enabled, if getOutputStream() throws + an exception, it gets caught by the restOperation. But with + expect header enabled we return back without throwing an exception + for the correct response code processing. + */ + outputStream = getConnOutputStream(); + } catch (IOException e) { + /* If getOutputStream fails with an exception and expect header + is enabled, we return back without throwing an exception to + the caller. The caller is responsible for setting the correct status code. + If expect header is not enabled, we throw back the exception. + */ + String expectHeader = getConnProperty(EXPECT); + if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE)) { + LOG.debug("Getting output stream failed with expect header enabled, returning back ", e); + return; + } else { + LOG.debug("Getting output stream failed without expect header enabled, throwing exception ", e); + throw e; + } + } + // update bytes sent for successful as well as failed attempts via the + // accompanying statusCode. this.bytesSent = length; + + // If this fails with or without expect header enabled, + // it throws an IOException. outputStream.write(buffer, offset, length); } finally { + // Closing the opened output stream + if (outputStream != null) { + outputStream.close(); + } if (this.isTraceEnabled) { this.sendRequestTimeMs = elapsedTimeMs(startTime); } @@ -343,13 +382,13 @@ public class AbfsHttpOperation implements AbfsPerfLoggable { startTime = System.nanoTime(); } - this.statusCode = this.connection.getResponseCode(); + this.statusCode = getConnResponseCode(); if (this.isTraceEnabled) { this.recvResponseTimeMs = elapsedTimeMs(startTime); } - this.statusDescription = this.connection.getResponseMessage(); + this.statusDescription = getConnResponseMessage(); this.requestId = this.connection.getHeaderField(HttpHeaderConfigurations.X_MS_REQUEST_ID); if (this.requestId == null) { @@ -542,6 +581,58 @@ public class AbfsHttpOperation implements AbfsPerfLoggable { return stream == null ? true : false; } + /** + * Gets the connection request property for a key. + * @param key The request property key. + * @return request peoperty value. + */ + String getConnProperty(String key) { + return connection.getRequestProperty(key); + } + + /** + * Gets the connection url. + * @return url. + */ + URL getConnUrl() { + return connection.getURL(); + } + + /** + * Gets the connection request method. + * @return request method. + */ + String getConnRequestMethod() { + return connection.getRequestMethod(); + } + + /** + * Gets the connection response code. + * @return response code. + * @throws IOException + */ + Integer getConnResponseCode() throws IOException { + return connection.getResponseCode(); + } + + /** + * Gets the connection output stream. + * @return output stream. + * @throws IOException + */ + OutputStream getConnOutputStream() throws IOException { + return connection.getOutputStream(); + } + + /** + * Gets the connection response message. + * @return response message. + * @throws IOException + */ + String getConnResponseMessage() throws IOException { + return connection.getResponseMessage(); + } + public static class AbfsHttpOperationWithFixedResult extends AbfsHttpOperation { /** * Creates an instance to represent fixed results. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 61160f92bdf..d989a94afaa 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -80,6 +80,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, private boolean disableOutputStreamFlush; private boolean enableSmallWriteOptimization; private boolean isAppendBlob; + private boolean isExpectHeaderEnabled; private volatile IOException lastError; private long lastFlushOffset; @@ -133,6 +134,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, this.position = abfsOutputStreamContext.getPosition(); this.closed = false; this.supportFlush = abfsOutputStreamContext.isEnableFlush(); + this.isExpectHeaderEnabled = abfsOutputStreamContext.isExpectHeaderEnabled(); this.disableOutputStreamFlush = abfsOutputStreamContext .isDisableOutputStreamFlush(); this.enableSmallWriteOptimization @@ -327,7 +329,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, * leaseId - The AbfsLeaseId for this request. */ AppendRequestParameters reqParams = new AppendRequestParameters( - offset, 0, bytesLength, mode, false, leaseId); + offset, 0, bytesLength, mode, false, leaseId, isExpectHeaderEnabled); AbfsRestOperation op = client.append(path, blockUploadData.toByteArray(), reqParams, cachedSasToken.get(), new TracingContext(tracingContext)); @@ -573,7 +575,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "writeCurrentBufferToService", "append")) { AppendRequestParameters reqParams = new AppendRequestParameters(offset, 0, - bytesLength, APPEND_MODE, true, leaseId); + bytesLength, APPEND_MODE, true, leaseId, isExpectHeaderEnabled); AbfsRestOperation op = client.append(path, uploadData.toByteArray(), reqParams, cachedSasToken.get(), new TracingContext(tracingContext)); cachedSasToken.update(op.getSasToken()); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java index ad303823e0c..ed897330367 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java @@ -33,6 +33,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext { private boolean enableFlush; + private boolean enableExpectHeader; + private boolean enableSmallWriteOptimization; private boolean disableOutputStreamFlush; @@ -78,6 +80,11 @@ public class AbfsOutputStreamContext extends AbfsStreamContext { return this; } + public AbfsOutputStreamContext enableExpectHeader(final boolean enableExpectHeader) { + this.enableExpectHeader = enableExpectHeader; + return this; + } + public AbfsOutputStreamContext enableSmallWriteOptimization(final boolean enableSmallWriteOptimization) { this.enableSmallWriteOptimization = enableSmallWriteOptimization; return this; @@ -184,6 +191,10 @@ public class AbfsOutputStreamContext extends AbfsStreamContext { return enableFlush; } + public boolean isExpectHeaderEnabled() { + return enableExpectHeader; + } + public boolean isDisableOutputStreamFlush() { return disableOutputStreamFlush; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index ad99020390a..362e0d1213f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -25,6 +25,7 @@ import java.net.URL; import java.net.UnknownHostException; import java.util.List; +import org.apache.hadoop.classification.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +39,8 @@ import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_CONTINUE; + /** * The AbfsRestOperation for Rest AbfsClient. */ @@ -236,11 +239,21 @@ public class AbfsRestOperation { } } - if (result.getStatusCode() >= HttpURLConnection.HTTP_BAD_REQUEST) { + int status = result.getStatusCode(); + /* + If even after exhausting all retries, the http status code has an + invalid value it qualifies for InvalidAbfsRestOperationException. + All http status code less than 1xx range are considered as invalid + status codes. + */ + if (status < HTTP_CONTINUE) { + throw new InvalidAbfsRestOperationException(null, retryCount); + } + + if (status >= HttpURLConnection.HTTP_BAD_REQUEST) { throw new AbfsRestOperationException(result.getStatusCode(), result.getStorageErrorCode(), result.getStorageErrorMessage(), null, result); } - LOG.trace("{} REST operation complete", operationType); } @@ -268,7 +281,7 @@ public class AbfsRestOperation { case Custom: case OAuth: LOG.debug("Authenticating request with OAuth2 access token"); - httpOperation.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION, + httpOperation.setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION, client.getAccessToken()); break; case SAS: @@ -319,7 +332,7 @@ public class AbfsRestOperation { LOG.warn("Unknown host name: {}. Retrying to resolve the host name...", hostname); if (!client.getRetryPolicy().shouldRetry(retryCount, -1)) { - throw new InvalidAbfsRestOperationException(ex); + throw new InvalidAbfsRestOperationException(ex, retryCount); } return false; } catch (IOException ex) { @@ -330,12 +343,25 @@ public class AbfsRestOperation { failureReason = RetryReason.getAbbreviation(ex, -1, ""); if (!client.getRetryPolicy().shouldRetry(retryCount, -1)) { - throw new InvalidAbfsRestOperationException(ex); + throw new InvalidAbfsRestOperationException(ex, retryCount); } return false; } finally { - intercept.updateMetrics(operationType, httpOperation); + int status = httpOperation.getStatusCode(); + /* + A status less than 300 (2xx range) or greater than or equal + to 500 (5xx range) should contribute to throttling metrics being updated. + Less than 200 or greater than or equal to 500 show failed operations. 2xx + range contributes to successful operations. 3xx range is for redirects + and 4xx range is for user errors. These should not be a part of + throttling backoff computation. + */ + boolean updateMetricsResponseCode = (status < HttpURLConnection.HTTP_MULT_CHOICE + || status >= HttpURLConnection.HTTP_INTERNAL_ERROR); + if (updateMetricsResponseCode) { + intercept.updateMetrics(operationType, httpOperation); + } } LOG.debug("HttpRequest: {}: {}", operationType, httpOperation); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java index 89d99471a82..92a08ea8d20 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java @@ -24,6 +24,8 @@ import java.net.HttpURLConnection; import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_CONTINUE; + /** * Retry policy used by AbfsClient. * */ @@ -118,7 +120,9 @@ public class ExponentialRetryPolicy { /** * Returns if a request should be retried based on the retry count, current response, - * and the current strategy. + * and the current strategy. The valid http status code lies in the range of 1xx-5xx. + * But an invalid status code might be set due to network or timeout kind of issues. + * Such invalid status code also qualify for retry. * * @param retryCount The current retry attempt count. * @param statusCode The status code of the response, or -1 for socket error. @@ -126,7 +130,7 @@ public class ExponentialRetryPolicy { */ public boolean shouldRetry(final int retryCount, final int statusCode) { return retryCount < this.retryCount - && (statusCode == -1 + && (statusCode < HTTP_CONTINUE || statusCode == HttpURLConnection.HTTP_CLIENT_TIMEOUT || (statusCode >= HttpURLConnection.HTTP_INTERNAL_ERROR && statusCode != HttpURLConnection.HTTP_NOT_IMPLEMENTED diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java index 9a2ccda36fb..cde9ea35e40 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java @@ -139,6 +139,10 @@ public class TracingContext { this.opType = operation; } + public int getRetryCount() { + return retryCount; + } + public void setRetryCount(int retryCount) { this.retryCount = retryCount; } diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md index 31498df1790..aff1e32b83f 100644 --- a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md +++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md @@ -767,6 +767,17 @@ Hflush() being the only documented API that can provide persistent data transfer, Flush() also attempting to persist buffered data will lead to performance issues. +### Hundred Continue Options + +`fs.azure.account.expect.header.enabled`: This configuration parameter is used +to specify whether you wish to send a expect 100 continue header with each +append request or not. It is configured to true by default. This flag configures +the client to check with the Azure store before uploading a block of data from +an output stream. This allows the client to throttle back gracefully -before +actually attempting to upload the block. In experiments this provides +significant throughput improvements under heavy load. For more information : +- https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Expect + ### Account level throttling Options diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java index c5bf85a4f81..74655fd5736 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java @@ -42,7 +42,7 @@ import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.fs.azurebfs.services.AuthType; -import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient; +import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient; import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore; import org.apache.hadoop.fs.azure.NativeAzureFileSystem; import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation; @@ -254,7 +254,7 @@ public abstract class AbstractAbfsIntegrationTest extends } public AccessTokenProvider getAccessTokenProvider(final AzureBlobFileSystem fs) { - return TestAbfsClient.getAccessTokenProvider(fs.getAbfsStore().getClient()); + return ITestAbfsClient.getAccessTokenProvider(fs.getAbfsStore().getClient()); } public void loadConfiguredFileSystem() throws Exception { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java index 2f23ac5c5c7..d9a3cea089f 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java @@ -43,7 +43,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConcurrentWriteOperati import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; -import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient; +import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator; @@ -362,7 +362,7 @@ public class ITestAzureBlobFileSystemCreate extends // Get mock AbfsClient with current config AbfsClient mockClient - = TestAbfsClient.getMockAbfsClient( + = ITestAbfsClient.getMockAbfsClient( fs.getAbfsStore().getClient(), fs.getAbfsStore().getAbfsConfiguration()); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java index db181fb5dd6..1f0ff667522 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java @@ -35,7 +35,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationExcep import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; -import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient; +import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient; import org.apache.hadoop.fs.azurebfs.services.TestAbfsPerfTracker; import org.apache.hadoop.fs.azurebfs.utils.TestMockHelpers; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; @@ -176,7 +176,7 @@ public class ITestAzureBlobFileSystemDelete extends final AzureBlobFileSystem fs = getFileSystem(); AbfsClient abfsClient = fs.getAbfsStore().getClient(); - AbfsClient testClient = TestAbfsClient.createTestClientFromCurrentContext( + AbfsClient testClient = ITestAbfsClient.createTestClientFromCurrentContext( abfsClient, abfsConfig); @@ -223,7 +223,7 @@ public class ITestAzureBlobFileSystemDelete extends public void testDeleteIdempotencyTriggerHttp404() throws Exception { final AzureBlobFileSystem fs = getFileSystem(); - AbfsClient client = TestAbfsClient.createTestClientFromCurrentContext( + AbfsClient client = ITestAbfsClient.createTestClientFromCurrentContext( fs.getAbfsStore().getClient(), this.getConfiguration()); @@ -242,7 +242,7 @@ public class ITestAzureBlobFileSystemDelete extends getTestTracingContext(fs, true))); // mock idempotency check to mimic retried case - AbfsClient mockClient = TestAbfsClient.getMockAbfsClient( + AbfsClient mockClient = ITestAbfsClient.getMockAbfsClient( fs.getAbfsStore().getClient(), this.getConfiguration()); AzureBlobFileSystemStore mockStore = mock(AzureBlobFileSystemStore.class); @@ -257,10 +257,10 @@ public class ITestAzureBlobFileSystemDelete extends // Case 2: Mimic retried case // Idempotency check on Delete always returns success - AbfsRestOperation idempotencyRetOp = TestAbfsClient.getRestOp( + AbfsRestOperation idempotencyRetOp = ITestAbfsClient.getRestOp( DeletePath, mockClient, HTTP_METHOD_DELETE, - TestAbfsClient.getTestUrl(mockClient, "/NonExistingPath"), - TestAbfsClient.getTestRequestHeaders(mockClient)); + ITestAbfsClient.getTestUrl(mockClient, "/NonExistingPath"), + ITestAbfsClient.getTestRequestHeaders(mockClient)); idempotencyRetOp.hardSetResult(HTTP_OK); doReturn(idempotencyRetOp).when(mockClient).deleteIdempotencyCheckOp(any()); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java index cf7696ec1b0..6496c9234f3 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java @@ -203,7 +203,7 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest { // Trying to append with correct CPK headers AppendRequestParameters appendRequestParameters = new AppendRequestParameters( - 0, 0, 5, Mode.APPEND_MODE, false, null); + 0, 0, 5, Mode.APPEND_MODE, false, null, true); byte[] buffer = getRandomBytesArray(5); AbfsClient abfsClient = fs.getAbfsClient(); AbfsRestOperation abfsRestOperation = abfsClient @@ -248,7 +248,7 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest { // Trying to append without CPK headers AppendRequestParameters appendRequestParameters = new AppendRequestParameters( - 0, 0, 5, Mode.APPEND_MODE, false, null); + 0, 0, 5, Mode.APPEND_MODE, false, null, true); byte[] buffer = getRandomBytesArray(5); AbfsClient abfsClient = fs.getAbfsClient(); AbfsRestOperation abfsRestOperation = abfsClient diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java similarity index 64% rename from hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java rename to hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java index 08eb3adc926..c031e5daa6c 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java @@ -20,20 +20,43 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; import java.lang.reflect.Field; +import java.net.HttpURLConnection; +import java.net.ProtocolException; import java.net.URL; import java.util.List; +import java.util.Random; import java.util.regex.Pattern; +import org.assertj.core.api.Assertions; import org.junit.Test; +import org.mockito.Mockito; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; +import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; +import org.apache.hadoop.fs.azurebfs.TestAbfsConfigurationFieldsValidation; +import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters; import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat; import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; -import static org.assertj.core.api.Assertions.assertThat; +import static java.net.HttpURLConnection.HTTP_NOT_FOUND; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_POSITION; +import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ABFS_ACCOUNT_NAME; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -59,14 +82,19 @@ import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST * Test useragent of abfs client. * */ -public final class TestAbfsClient { +public final class ITestAbfsClient extends AbstractAbfsIntegrationTest { private static final String ACCOUNT_NAME = "bogusAccountName.dfs.core.windows.net"; private static final String FS_AZURE_USER_AGENT_PREFIX = "Partner Service"; + private static final String TEST_PATH = "/testfile"; + public static final int REDUCED_RETRY_COUNT = 2; + public static final int REDUCED_BACKOFF_INTERVAL = 100; + public static final int BUFFER_LENGTH = 5; + public static final int BUFFER_OFFSET = 0; private final Pattern userAgentStringPattern; - public TestAbfsClient(){ + public ITestAbfsClient() throws Exception { StringBuilder regEx = new StringBuilder(); regEx.append("^"); regEx.append(APN_VERSION); @@ -124,7 +152,7 @@ public final class TestAbfsClient { } private void verifybBasicInfo(String userAgentStr) { - assertThat(userAgentStr) + Assertions.assertThat(userAgentStr) .describedAs("User-Agent string [" + userAgentStr + "] should be of the pattern: " + this.userAgentStringPattern.pattern()) .matches(this.userAgentStringPattern) @@ -153,7 +181,7 @@ public final class TestAbfsClient { String userAgentStr = getUserAgentString(abfsConfiguration, false); verifybBasicInfo(userAgentStr); - assertThat(userAgentStr) + Assertions.assertThat(userAgentStr) .describedAs("User-Agent string should contain " + FS_AZURE_USER_AGENT_PREFIX) .contains(FS_AZURE_USER_AGENT_PREFIX); @@ -163,7 +191,7 @@ public final class TestAbfsClient { userAgentStr = getUserAgentString(abfsConfiguration, false); verifybBasicInfo(userAgentStr); - assertThat(userAgentStr) + Assertions.assertThat(userAgentStr) .describedAs("User-Agent string should not contain " + FS_AZURE_USER_AGENT_PREFIX) .doesNotContain(FS_AZURE_USER_AGENT_PREFIX); } @@ -179,14 +207,14 @@ public final class TestAbfsClient { String userAgentStr = getUserAgentString(abfsConfiguration, true); verifybBasicInfo(userAgentStr); - assertThat(userAgentStr) + Assertions.assertThat(userAgentStr) .describedAs("User-Agent string should contain sslProvider") .contains(DelegatingSSLSocketFactory.getDefaultFactory().getProviderName()); userAgentStr = getUserAgentString(abfsConfiguration, false); verifybBasicInfo(userAgentStr); - assertThat(userAgentStr) + Assertions.assertThat(userAgentStr) .describedAs("User-Agent string should not contain sslProvider") .doesNotContain(DelegatingSSLSocketFactory.getDefaultFactory().getProviderName()); } @@ -202,7 +230,7 @@ public final class TestAbfsClient { String userAgentStr = getUserAgentString(abfsConfiguration, false); verifybBasicInfo(userAgentStr); - assertThat(userAgentStr) + Assertions.assertThat(userAgentStr) .describedAs("User-Agent string should contain cluster name") .contains(clusterName); @@ -212,7 +240,7 @@ public final class TestAbfsClient { userAgentStr = getUserAgentString(abfsConfiguration, false); verifybBasicInfo(userAgentStr); - assertThat(userAgentStr) + Assertions.assertThat(userAgentStr) .describedAs("User-Agent string should not contain cluster name") .doesNotContain(clusterName) .describedAs("User-Agent string should contain UNKNOWN as cluster name config is absent") @@ -230,7 +258,7 @@ public final class TestAbfsClient { String userAgentStr = getUserAgentString(abfsConfiguration, false); verifybBasicInfo(userAgentStr); - assertThat(userAgentStr) + Assertions.assertThat(userAgentStr) .describedAs("User-Agent string should contain cluster type") .contains(clusterType); @@ -240,7 +268,7 @@ public final class TestAbfsClient { userAgentStr = getUserAgentString(abfsConfiguration, false); verifybBasicInfo(userAgentStr); - assertThat(userAgentStr) + Assertions.assertThat(userAgentStr) .describedAs("User-Agent string should not contain cluster type") .doesNotContain(clusterType) .describedAs("User-Agent string should contain UNKNOWN as cluster type config is absent") @@ -311,24 +339,23 @@ public final class TestAbfsClient { AbfsThrottlingInterceptFactory.getInstance( abfsConfig.getAccountName().substring(0, abfsConfig.getAccountName().indexOf(DOT)), abfsConfig)); - // override baseurl - client = TestAbfsClient.setAbfsClientField(client, "abfsConfiguration", + client = ITestAbfsClient.setAbfsClientField(client, "abfsConfiguration", abfsConfig); // override baseurl - client = TestAbfsClient.setAbfsClientField(client, "baseUrl", + client = ITestAbfsClient.setAbfsClientField(client, "baseUrl", baseAbfsClientInstance.getBaseUrl()); // override auth provider if (currentAuthType == AuthType.SharedKey) { - client = TestAbfsClient.setAbfsClientField(client, "sharedKeyCredentials", + client = ITestAbfsClient.setAbfsClientField(client, "sharedKeyCredentials", new SharedKeyCredentials( abfsConfig.getAccountName().substring(0, abfsConfig.getAccountName().indexOf(DOT)), abfsConfig.getStorageAccountKey())); } else { - client = TestAbfsClient.setAbfsClientField(client, "tokenProvider", + client = ITestAbfsClient.setAbfsClientField(client, "tokenProvider", abfsConfig.getTokenProvider()); } @@ -336,7 +363,7 @@ public final class TestAbfsClient { String userAgent = "APN/1.0 Azure Blob FS/3.4.0-SNAPSHOT (PrivateBuild " + "JavaJRE 1.8.0_252; Linux 5.3.0-59-generic/amd64; openssl-1.0; " + "UNKNOWN/UNKNOWN) MSFT"; - client = TestAbfsClient.setAbfsClientField(client, "userAgent", userAgent); + client = ITestAbfsClient.setAbfsClientField(client, "userAgent", userAgent); return client; } @@ -404,4 +431,156 @@ public final class TestAbfsClient { public static AccessTokenProvider getAccessTokenProvider(AbfsClient client) { return client.getTokenProvider(); } + + /** + * Test helper method to get random bytes array. + * @param length The length of byte buffer. + * @return byte buffer. + */ + private byte[] getRandomBytesArray(int length) { + final byte[] b = new byte[length]; + new Random().nextBytes(b); + return b; + } + + /** + * Test to verify that client retries append request without + * expect header enabled if append with expect header enabled fails + * with 4xx kind of error. + * @throws Exception + */ + @Test + public void testExpectHundredContinue() throws Exception { + // Get the filesystem. + final AzureBlobFileSystem fs = getFileSystem(); + + final Configuration configuration = new Configuration(); + configuration.addResource(TEST_CONFIGURATION_FILE_NAME); + AbfsClient abfsClient = fs.getAbfsStore().getClient(); + + AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration, + configuration.get(FS_AZURE_ABFS_ACCOUNT_NAME)); + + // Update the configuration with reduced retry count and reduced backoff interval. + AbfsConfiguration abfsConfig + = TestAbfsConfigurationFieldsValidation.updateRetryConfigs( + abfsConfiguration, + REDUCED_RETRY_COUNT, REDUCED_BACKOFF_INTERVAL); + + // Gets the client. + AbfsClient testClient = Mockito.spy( + ITestAbfsClient.createTestClientFromCurrentContext( + abfsClient, + abfsConfig)); + + // Create the append request params with expect header enabled initially. + AppendRequestParameters appendRequestParameters + = new AppendRequestParameters( + BUFFER_OFFSET, BUFFER_OFFSET, BUFFER_LENGTH, + AppendRequestParameters.Mode.APPEND_MODE, false, null, true); + + byte[] buffer = getRandomBytesArray(BUFFER_LENGTH); + + // Create a test container to upload the data. + Path testPath = path(TEST_PATH); + fs.create(testPath); + String finalTestPath = testPath.toString() + .substring(testPath.toString().lastIndexOf("/")); + + // Creates a list of request headers. + final List requestHeaders + = ITestAbfsClient.getTestRequestHeaders(testClient); + requestHeaders.add( + new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, HTTP_METHOD_PATCH)); + if (appendRequestParameters.isExpectHeaderEnabled()) { + requestHeaders.add(new AbfsHttpHeader(EXPECT, HUNDRED_CONTINUE)); + } + + // Updates the query parameters. + final AbfsUriQueryBuilder abfsUriQueryBuilder + = testClient.createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, + Long.toString(appendRequestParameters.getPosition())); + + // Creates the url for the specified path. + URL url = testClient.createRequestUrl(finalTestPath, abfsUriQueryBuilder.toString()); + + // Create a mock of the AbfsRestOperation to set the urlConnection in the corresponding httpOperation. + AbfsRestOperation op = Mockito.spy(new AbfsRestOperation( + AbfsRestOperationType.Append, + testClient, + HTTP_METHOD_PUT, + url, + requestHeaders, buffer, + appendRequestParameters.getoffset(), + appendRequestParameters.getLength(), null)); + + AbfsHttpOperation abfsHttpOperation = Mockito.spy(new AbfsHttpOperation(url, + HTTP_METHOD_PUT, requestHeaders)); + + // Sets the expect request property if expect header is enabled. + if (appendRequestParameters.isExpectHeaderEnabled()) { + Mockito.doReturn(HUNDRED_CONTINUE).when(abfsHttpOperation) + .getConnProperty(EXPECT); + } + + HttpURLConnection urlConnection = mock(HttpURLConnection.class); + Mockito.doNothing().when(urlConnection).setRequestProperty(Mockito + .any(), Mockito.any()); + Mockito.doReturn(HTTP_METHOD_PUT).when(urlConnection).getRequestMethod(); + Mockito.doReturn(url).when(urlConnection).getURL(); + Mockito.doReturn(urlConnection).when(abfsHttpOperation).getConnection(); + + Mockito.doNothing().when(abfsHttpOperation).setRequestProperty(Mockito + .any(), Mockito.any()); + Mockito.doReturn(url).when(abfsHttpOperation).getConnUrl(); + + // Give user error code 404 when processResponse is called. + Mockito.doReturn(HTTP_METHOD_PUT).when(abfsHttpOperation).getConnRequestMethod(); + Mockito.doReturn(HTTP_NOT_FOUND).when(abfsHttpOperation).getConnResponseCode(); + Mockito.doReturn("Resource Not Found") + .when(abfsHttpOperation) + .getConnResponseMessage(); + + // Make the getOutputStream throw IOException to see it returns from the sendRequest correctly. + Mockito.doThrow(new ProtocolException("Server rejected Operation")) + .when(abfsHttpOperation) + .getConnOutputStream(); + + // Sets the httpOperation for the rest operation. + Mockito.doReturn(abfsHttpOperation) + .when(op) + .createHttpOperation(); + + // Mock the restOperation for the client. + Mockito.doReturn(op) + .when(testClient) + .getAbfsRestOperationForAppend(Mockito.any(), + Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), + Mockito.nullable(int.class), Mockito.nullable(int.class), + Mockito.any()); + + TracingContext tracingContext = Mockito.spy(new TracingContext("abcd", + "abcde", FSOperationType.APPEND, + TracingHeaderFormat.ALL_ID_FORMAT, null)); + + // Check that expect header is enabled before the append call. + Assertions.assertThat(appendRequestParameters.isExpectHeaderEnabled()) + .describedAs("The expect header is not true before the append call") + .isTrue(); + + intercept(AzureBlobFileSystemException.class, + () -> testClient.append(finalTestPath, buffer, appendRequestParameters, null, tracingContext)); + + // Verify that the request was not exponentially retried because of user error. + Assertions.assertThat(tracingContext.getRetryCount()) + .describedAs("The retry count is incorrect") + .isEqualTo(0); + + // Verify that the same request was retried with expect header disabled. + Assertions.assertThat(appendRequestParameters.isExpectHeaderEnabled()) + .describedAs("The expect header is not false") + .isFalse(); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java new file mode 100644 index 00000000000..6ffe2e2773b --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java @@ -0,0 +1,358 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.ProtocolException; +import java.net.URL; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.mockito.Mockito; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; +import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; +import org.apache.hadoop.fs.azurebfs.TestAbfsConfigurationFieldsValidation; +import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat; + +import static java.net.HttpURLConnection.HTTP_NOT_FOUND; +import static java.net.HttpURLConnection.HTTP_OK; +import static java.net.HttpURLConnection.HTTP_UNAVAILABLE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_POSITION; +import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ABFS_ACCOUNT_NAME; +import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; + +@RunWith(Parameterized.class) +public class ITestAbfsRestOperation extends AbstractAbfsIntegrationTest { + + // Specifies whether getOutputStream() or write() throws IOException. + public enum ErrorType {OUTPUTSTREAM, WRITE}; + + private static final int HTTP_EXPECTATION_FAILED = 417; + private static final int HTTP_ERROR = 0; + private static final int ZERO = 0; + private static final int REDUCED_RETRY_COUNT = 2; + private static final int REDUCED_BACKOFF_INTERVAL = 100; + private static final int BUFFER_LENGTH = 5; + private static final int BUFFER_OFFSET = 0; + private static final String TEST_PATH = "/testfile"; + + // Specifies whether the expect header is enabled or not. + @Parameterized.Parameter + public boolean expectHeaderEnabled; + + // Gives the http response code. + @Parameterized.Parameter(1) + public int responseCode; + + // Gives the http response message. + @Parameterized.Parameter(2) + public String responseMessage; + + // Gives the errorType based on the enum. + @Parameterized.Parameter(3) + public ErrorType errorType; + + // The intercept. + private AbfsThrottlingIntercept intercept; + + /* + HTTP_OK = 200, + HTTP_UNAVAILABLE = 503, + HTTP_NOT_FOUND = 404, + HTTP_EXPECTATION_FAILED = 417, + HTTP_ERROR = 0. + */ + @Parameterized.Parameters(name = "expect={0}-code={1}-ErrorType={3}") + public static Iterable params() { + return Arrays.asList(new Object[][]{ + {true, HTTP_OK, "OK", ErrorType.WRITE}, + {false, HTTP_OK, "OK", ErrorType.WRITE}, + {true, HTTP_UNAVAILABLE, "ServerBusy", ErrorType.OUTPUTSTREAM}, + {true, HTTP_NOT_FOUND, "Resource Not Found", ErrorType.OUTPUTSTREAM}, + {true, HTTP_EXPECTATION_FAILED, "Expectation Failed", ErrorType.OUTPUTSTREAM}, + {true, HTTP_ERROR, "Error", ErrorType.OUTPUTSTREAM} + }); + } + + public ITestAbfsRestOperation() throws Exception { + super(); + } + + /** + * Test helper method to get random bytes array. + * @param length The length of byte buffer + * @return byte buffer + */ + private byte[] getRandomBytesArray(int length) { + final byte[] b = new byte[length]; + new Random().nextBytes(b); + return b; + } + + /** + * Gives the AbfsRestOperation. + * @return abfsRestOperation. + */ + private AbfsRestOperation getRestOperation() throws Exception { + // Get the filesystem. + final AzureBlobFileSystem fs = getFileSystem(); + + final Configuration configuration = new Configuration(); + configuration.addResource(TEST_CONFIGURATION_FILE_NAME); + AbfsClient abfsClient = fs.getAbfsStore().getClient(); + + AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration, + configuration.get(FS_AZURE_ABFS_ACCOUNT_NAME)); + + // Update the configuration with reduced retry count and reduced backoff interval. + AbfsConfiguration abfsConfig + = TestAbfsConfigurationFieldsValidation.updateRetryConfigs( + abfsConfiguration, + REDUCED_RETRY_COUNT, REDUCED_BACKOFF_INTERVAL); + + intercept = Mockito.mock(AbfsThrottlingIntercept.class); + Mockito.doNothing().when(intercept).updateMetrics(Mockito.any(), Mockito.any()); + + // Gets the client. + AbfsClient testClient = Mockito.spy(ITestAbfsClient.createTestClientFromCurrentContext( + abfsClient, + abfsConfig)); + + Mockito.doReturn(intercept).when(testClient).getIntercept(); + + // Expect header is enabled or not based on the parameter. + AppendRequestParameters appendRequestParameters + = new AppendRequestParameters( + BUFFER_OFFSET, BUFFER_OFFSET, BUFFER_LENGTH, + AppendRequestParameters.Mode.APPEND_MODE, false, null, + expectHeaderEnabled); + + byte[] buffer = getRandomBytesArray(5); + + // Create a test container to upload the data. + Path testPath = path(TEST_PATH); + fs.create(testPath); + String finalTestPath = testPath.toString().substring(testPath.toString().lastIndexOf("/")); + + // Creates a list of request headers. + final List requestHeaders = ITestAbfsClient.getTestRequestHeaders(testClient); + requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, HTTP_METHOD_PATCH)); + if (appendRequestParameters.isExpectHeaderEnabled()) { + requestHeaders.add(new AbfsHttpHeader(EXPECT, HUNDRED_CONTINUE)); + } + + // Updates the query parameters. + final AbfsUriQueryBuilder abfsUriQueryBuilder = testClient.createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(appendRequestParameters.getPosition())); + + // Creates the url for the specified path. + URL url = testClient.createRequestUrl(finalTestPath, abfsUriQueryBuilder.toString()); + + // Create a mock of the AbfsRestOperation to set the urlConnection in the corresponding httpOperation. + AbfsRestOperation op = Mockito.spy(new AbfsRestOperation( + AbfsRestOperationType.Append, + testClient, + HTTP_METHOD_PUT, + url, + requestHeaders, buffer, + appendRequestParameters.getoffset(), + appendRequestParameters.getLength(), null)); + + AbfsHttpOperation abfsHttpOperation = Mockito.spy(new AbfsHttpOperation(url, HTTP_METHOD_PUT, requestHeaders)); + + // Sets the expect request property if expect header is enabled. + if (expectHeaderEnabled) { + Mockito.doReturn(HUNDRED_CONTINUE) + .when(abfsHttpOperation) + .getConnProperty(EXPECT); + } + + HttpURLConnection urlConnection = mock(HttpURLConnection.class); + Mockito.doNothing().when(urlConnection).setRequestProperty(Mockito + .any(), Mockito.any()); + Mockito.doReturn(HTTP_METHOD_PUT).when(urlConnection).getRequestMethod(); + Mockito.doReturn(url).when(urlConnection).getURL(); + Mockito.doReturn(urlConnection).when(abfsHttpOperation).getConnection(); + + Mockito.doNothing().when(abfsHttpOperation).setRequestProperty(Mockito + .any(), Mockito.any()); + Mockito.doReturn(url).when(abfsHttpOperation).getConnUrl(); + Mockito.doReturn(HTTP_METHOD_PUT).when(abfsHttpOperation).getConnRequestMethod(); + + switch (errorType) { + case OUTPUTSTREAM: + // If the getOutputStream() throws IOException and Expect Header is + // enabled, it returns back to processResponse and hence we have + // mocked the response code and the response message to check different + // behaviour based on response code. + Mockito.doReturn(responseCode).when(abfsHttpOperation).getConnResponseCode(); + Mockito.doReturn(responseMessage) + .when(abfsHttpOperation) + .getConnResponseMessage(); + Mockito.doThrow(new ProtocolException("Server rejected Operation")) + .when(abfsHttpOperation) + .getConnOutputStream(); + break; + case WRITE: + // If write() throws IOException and Expect Header is + // enabled or not, it should throw back the exception. + OutputStream outputStream = Mockito.spy(new OutputStream() { + @Override + public void write(final int i) throws IOException { + } + }); + Mockito.doReturn(outputStream).when(abfsHttpOperation).getConnOutputStream(); + Mockito.doThrow(new IOException()) + .when(outputStream) + .write(buffer, appendRequestParameters.getoffset(), + appendRequestParameters.getLength()); + break; + default: + break; + } + + // Sets the httpOperation for the rest operation. + Mockito.doReturn(abfsHttpOperation) + .when(op) + .createHttpOperation(); + return op; + } + + void assertTraceContextState(int retryCount, int assertRetryCount, int bytesSent, int assertBytesSent, + int expectedBytesSent, int assertExpectedBytesSent) { + // Assert that the request is retried or not. + Assertions.assertThat(retryCount) + .describedAs("The retry count is incorrect") + .isEqualTo(assertRetryCount); + + // Assert that metrics will be updated correctly. + Assertions.assertThat(bytesSent) + .describedAs("The bytes sent is incorrect") + .isEqualTo(assertBytesSent); + Assertions.assertThat(expectedBytesSent) + .describedAs("The expected bytes sent is incorrect") + .isEqualTo(assertExpectedBytesSent); + } + + /** + * Test the functionalities based on whether getOutputStream() or write() + * throws exception and what is the corresponding response code. + */ + @Test + public void testExpectHundredContinue() throws Exception { + // Gets the AbfsRestOperation. + AbfsRestOperation op = getRestOperation(); + AbfsHttpOperation httpOperation = op.createHttpOperation(); + + TracingContext tracingContext = Mockito.spy(new TracingContext("abcd", + "abcde", FSOperationType.APPEND, + TracingHeaderFormat.ALL_ID_FORMAT, null)); + + switch (errorType) { + case WRITE: + // If write() throws IOException and Expect Header is + // enabled or not, it should throw back the exception + // which is caught and exponential retry logic comes into place. + intercept(IOException.class, + () -> op.execute(tracingContext)); + + // Asserting update of metrics and retries. + assertTraceContextState(tracingContext.getRetryCount(), REDUCED_RETRY_COUNT, httpOperation.getBytesSent(), BUFFER_LENGTH, + 0, 0); + break; + case OUTPUTSTREAM: + switch (responseCode) { + case HTTP_UNAVAILABLE: + // In the case of 503 i.e. throttled case, we should retry. + intercept(IOException.class, + () -> op.execute(tracingContext)); + + // Asserting update of metrics and retries. + assertTraceContextState(tracingContext.getRetryCount(), REDUCED_RETRY_COUNT, httpOperation.getBytesSent(), ZERO, + httpOperation.getExpectedBytesToBeSent(), BUFFER_LENGTH); + + // Verifies that update Metrics call is made for throttle case and for the first without retry + + // for the retried cases as well. + Mockito.verify(intercept, times(REDUCED_RETRY_COUNT + 1)) + .updateMetrics(Mockito.any(), Mockito.any()); + break; + case HTTP_ERROR: + // In the case of http status code 0 i.e. ErrorType case, we should retry. + intercept(IOException.class, + () -> op.execute(tracingContext)); + + // Asserting update of metrics and retries. + assertTraceContextState(tracingContext.getRetryCount(), REDUCED_RETRY_COUNT, httpOperation.getBytesSent(), + ZERO, 0, 0); + + // Verifies that update Metrics call is made for ErrorType case and for the first without retry + + // for the retried cases as well. + Mockito.verify(intercept, times(REDUCED_RETRY_COUNT + 1)) + .updateMetrics(Mockito.any(), Mockito.any()); + break; + case HTTP_NOT_FOUND: + case HTTP_EXPECTATION_FAILED: + // In the case of 4xx ErrorType. i.e. user ErrorType, retry should not happen. + intercept(AzureBlobFileSystemException.class, + () -> op.execute(tracingContext)); + + // Asserting update of metrics and retries. + assertTraceContextState(tracingContext.getRetryCount(), ZERO, 0, + 0, 0, 0); + + // Verifies that update Metrics call is not made for user ErrorType case. + Mockito.verify(intercept, never()) + .updateMetrics(Mockito.any(), Mockito.any()); + break; + default: + break; + } + break; + default: + break; + } + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java index 0673e387bfb..e26ba938cf5 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java @@ -72,6 +72,7 @@ public final class TestAbfsOutputStream { boolean isFlushEnabled, boolean disableOutputStreamFlush, boolean isAppendBlob, + boolean isExpectHeaderEnabled, AbfsClient client, String path, TracingContext tracingContext, @@ -89,6 +90,7 @@ public final class TestAbfsOutputStream { return new AbfsOutputStreamContext(2) .withWriteBufferSize(writeBufferSize) + .enableExpectHeader(isExpectHeaderEnabled) .enableFlush(isFlushEnabled) .disableOutputStreamFlush(disableOutputStreamFlush) .withStreamStatistics(new AbfsOutputStreamStatisticsImpl()) @@ -129,6 +131,7 @@ public final class TestAbfsOutputStream { true, false, false, + true, client, PATH, new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id", @@ -149,9 +152,9 @@ public final class TestAbfsOutputStream { out.hsync(); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, WRITE_SIZE, APPEND_MODE, false, null); + 0, 0, WRITE_SIZE, APPEND_MODE, false, null, true); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - WRITE_SIZE, 0, 2 * WRITE_SIZE, APPEND_MODE, false, null); + WRITE_SIZE, 0, 2 * WRITE_SIZE, APPEND_MODE, false, null, true); verify(client, times(1)).append( eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), @@ -190,6 +193,7 @@ public final class TestAbfsOutputStream { true, false, false, + true, client, PATH, tracingContext, @@ -203,9 +207,9 @@ public final class TestAbfsOutputStream { out.close(); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, BUFFER_SIZE, APPEND_MODE, false, null); + 0, 0, BUFFER_SIZE, APPEND_MODE, false, null, true); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - BUFFER_SIZE, 0, 5*WRITE_SIZE-BUFFER_SIZE, APPEND_MODE, false, null); + BUFFER_SIZE, 0, 5*WRITE_SIZE-BUFFER_SIZE, APPEND_MODE, false, null, true); verify(client, times(1)).append( eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), @@ -264,6 +268,7 @@ public final class TestAbfsOutputStream { true, false, false, + true, client, PATH, tracingContext, @@ -277,9 +282,9 @@ public final class TestAbfsOutputStream { out.close(); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, BUFFER_SIZE, APPEND_MODE, false, null); + 0, 0, BUFFER_SIZE, APPEND_MODE, false, null, true); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null); + BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null, true); verify(client, times(1)).append( eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), any(TracingContext.class)); @@ -335,6 +340,7 @@ public final class TestAbfsOutputStream { true, false, false, + true, client, PATH, new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id", @@ -350,9 +356,9 @@ public final class TestAbfsOutputStream { Thread.sleep(1000); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, BUFFER_SIZE, APPEND_MODE, false, null); + 0, 0, BUFFER_SIZE, APPEND_MODE, false, null, true); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null); + BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null, true); verify(client, times(1)).append( eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), any(TracingContext.class)); @@ -390,6 +396,7 @@ public final class TestAbfsOutputStream { true, false, true, + true, client, PATH, new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id", @@ -405,9 +412,9 @@ public final class TestAbfsOutputStream { Thread.sleep(1000); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, BUFFER_SIZE, APPEND_MODE, true, null); + 0, 0, BUFFER_SIZE, APPEND_MODE, true, null, true); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, true, null); + BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, true, null, true); verify(client, times(1)).append( eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), any(TracingContext.class)); @@ -449,6 +456,7 @@ public final class TestAbfsOutputStream { true, false, false, + true, client, PATH, new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id", @@ -464,9 +472,9 @@ public final class TestAbfsOutputStream { out.hflush(); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, BUFFER_SIZE, APPEND_MODE, false, null); + 0, 0, BUFFER_SIZE, APPEND_MODE, false, null, true); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null); + BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null, true); verify(client, times(1)).append( eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), any(TracingContext.class)); @@ -518,6 +526,7 @@ public final class TestAbfsOutputStream { true, false, false, + true, client, PATH, new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id", @@ -535,9 +544,9 @@ public final class TestAbfsOutputStream { Thread.sleep(1000); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, BUFFER_SIZE, APPEND_MODE, false, null); + 0, 0, BUFFER_SIZE, APPEND_MODE, false, null, true); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null); + BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null, true); verify(client, times(1)).append( eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), any(TracingContext.class)); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java index 65ea79b36bd..f5cbceaddd8 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java @@ -58,7 +58,7 @@ public class TestAbfsRenameRetryRecovery extends AbstractAbfsIntegrationTest { String destNoParentPath = "/NoParent/Dest"; AzureBlobFileSystem fs = getFileSystem(); - AbfsClient mockClient = TestAbfsClient.getMockAbfsClient( + AbfsClient mockClient = ITestAbfsClient.getMockAbfsClient( fs.getAbfsStore().getClient(), fs.getAbfsStore().getAbfsConfiguration()); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java index a1fc4e138d6..12ab4e9ead6 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java @@ -103,7 +103,7 @@ public class TestExponentialRetryPolicy extends AbstractAbfsIntegrationTest { AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration, "dummy.dfs.core.windows.net"); AbfsThrottlingIntercept intercept; - AbfsClient abfsClient = TestAbfsClient.createTestClientFromCurrentContext(fs.getAbfsStore().getClient(), abfsConfiguration); + AbfsClient abfsClient = ITestAbfsClient.createTestClientFromCurrentContext(fs.getAbfsStore().getClient(), abfsConfiguration); intercept = abfsClient.getIntercept(); Assertions.assertThat(intercept) .describedAs("AbfsNoOpThrottlingIntercept instance expected") @@ -114,7 +114,7 @@ public class TestExponentialRetryPolicy extends AbstractAbfsIntegrationTest { // On disabling throttling AbfsClientThrottlingIntercept object is returned AbfsConfiguration abfsConfiguration1 = new AbfsConfiguration(configuration, "dummy1.dfs.core.windows.net"); - AbfsClient abfsClient1 = TestAbfsClient.createTestClientFromCurrentContext(fs.getAbfsStore().getClient(), abfsConfiguration1); + AbfsClient abfsClient1 = ITestAbfsClient.createTestClientFromCurrentContext(fs.getAbfsStore().getClient(), abfsConfiguration1); intercept = abfsClient1.getIntercept(); Assertions.assertThat(intercept) .describedAs("AbfsClientThrottlingIntercept instance expected")