diff --git a/hadoop-tools/hadoop-azure/pom.xml b/hadoop-tools/hadoop-azure/pom.xml index f079b71634b..c40b6bbb450 100644 --- a/hadoop-tools/hadoop-azure/pom.xml +++ b/hadoop-tools/hadoop-azure/pom.xml @@ -262,7 +262,12 @@ mockito-all test - + + + org.assertj + assertj-core + test + diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index ffddc4554eb..052ae01a13e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -178,6 +178,10 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_USE_UPN) private boolean useUpn; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ABFS_LATENCY_TRACK, + DefaultValue = DEFAULT_ABFS_LATENCY_TRACK) + private boolean trackLatency; + private Map storageAccountKeys; public AbfsConfiguration(final Configuration rawConfig, String accountName) @@ -471,6 +475,15 @@ public class AbfsConfiguration{ return this.useUpn; } + /** + * Whether {@code AbfsClient} should track and send latency info back to storage servers. + * + * @return a boolean indicating whether latency should be tracked. + */ + public boolean shouldTrackLatency() { + return this.trackLatency; + } + public AccessTokenProvider getTokenProvider() throws TokenAccessProviderException { AuthType authType = getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SharedKey); if (authType == AuthType.OAuth) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 75fc08e04eb..5acb5975fcb 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -34,6 +34,7 @@ import java.nio.charset.CharsetEncoder; import java.nio.charset.StandardCharsets; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; @@ -80,6 +81,8 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.fs.azurebfs.services.AuthType; import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy; import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials; +import org.apache.hadoop.fs.azurebfs.services.AbfsPerfTracker; +import org.apache.hadoop.fs.azurebfs.services.AbfsPerfInfo; import org.apache.hadoop.fs.azurebfs.utils.Base64; import org.apache.hadoop.fs.azurebfs.utils.CRC64; import org.apache.hadoop.fs.azurebfs.utils.UriUtils; @@ -119,6 +122,7 @@ public class AzureBlobFileSystemStore { private static final String TOKEN_DATE_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSSSSSS'Z'"; private static final String XMS_PROPERTIES_ENCODING = "ISO-8859-1"; private static final int LIST_MAX_RESULTS = 500; + private static final int GET_SET_AGGREGATE_COUNT = 2; private final AbfsConfiguration abfsConfiguration; private final Set azureAtomicRenameDirSet; @@ -127,6 +131,7 @@ public class AzureBlobFileSystemStore { private final AuthType authType; private final UserGroupInformation userGroupInformation; private final IdentityTransformer identityTransformer; + private final AbfsPerfTracker abfsPerfTracker; public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration configuration) throws IOException { @@ -159,6 +164,7 @@ public class AzureBlobFileSystemStore { this.authType = abfsConfiguration.getAuthType(accountName); boolean usingOauth = (authType == AuthType.OAuth); boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : isSecureScheme; + this.abfsPerfTracker = new AbfsPerfTracker(fileSystemName, accountName, this.abfsConfiguration); initializeClient(uri, fileSystemName, accountName, useHttps); this.identityTransformer = new IdentityTransformer(abfsConfiguration.getRawConfiguration()); } @@ -203,10 +209,13 @@ public class AzureBlobFileSystemStore { public boolean getIsNamespaceEnabled() throws AzureBlobFileSystemException { if (!isNamespaceEnabledSet) { + LOG.debug("Get root ACL status"); - try { - client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + AbfsHttpConstants.ROOT_PATH); + try (AbfsPerfInfo perfInfo = startTracking("getIsNamespaceEnabled", "getAclStatus")) { + AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + AbfsHttpConstants.ROOT_PATH); + perfInfo.registerResult(op.getResult()); isNamespaceEnabled = true; + perfInfo.registerSuccess(true); } catch (AbfsRestOperationException ex) { // Get ACL status is a HEAD request, its response doesn't contain errorCode // So can only rely on its status code to determine its account type. @@ -257,17 +266,23 @@ public class AzureBlobFileSystemStore { } public Hashtable getFilesystemProperties() throws AzureBlobFileSystemException { - LOG.debug("getFilesystemProperties for filesystem: {}", - client.getFileSystem()); + try (AbfsPerfInfo perfInfo = startTracking("getFilesystemProperties", + "getFilesystemProperties")) { + LOG.debug("getFilesystemProperties for filesystem: {}", + client.getFileSystem()); - final Hashtable parsedXmsProperties; + final Hashtable parsedXmsProperties; - final AbfsRestOperation op = client.getFilesystemProperties(); - final String xMsProperties = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PROPERTIES); + final AbfsRestOperation op = client.getFilesystemProperties(); + perfInfo.registerResult(op.getResult()); - parsedXmsProperties = parseCommaSeparatedXmsProperties(xMsProperties); + final String xMsProperties = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PROPERTIES); - return parsedXmsProperties; + parsedXmsProperties = parseCommaSeparatedXmsProperties(xMsProperties); + perfInfo.registerSuccess(true); + + return parsedXmsProperties; + } } public void setFilesystemProperties(final Hashtable properties) @@ -280,159 +295,196 @@ public class AzureBlobFileSystemStore { client.getFileSystem(), properties); - final String commaSeparatedProperties; - try { - commaSeparatedProperties = convertXmsPropertiesToCommaSeparatedString(properties); - } catch (CharacterCodingException ex) { - throw new InvalidAbfsRestOperationException(ex); - } + try (AbfsPerfInfo perfInfo = startTracking("setFilesystemProperties", + "setFilesystemProperties")) { + final String commaSeparatedProperties; + try { + commaSeparatedProperties = convertXmsPropertiesToCommaSeparatedString(properties); + } catch (CharacterCodingException ex) { + throw new InvalidAbfsRestOperationException(ex); + } - client.setFilesystemProperties(commaSeparatedProperties); + final AbfsRestOperation op = client.setFilesystemProperties(commaSeparatedProperties); + perfInfo.registerResult(op.getResult()).registerSuccess(true); + } } public Hashtable getPathStatus(final Path path) throws AzureBlobFileSystemException { - LOG.debug("getPathStatus for filesystem: {} path: {}", - client.getFileSystem(), - path); + try (AbfsPerfInfo perfInfo = startTracking("getPathStatus", "getPathStatus")){ + LOG.debug("getPathStatus for filesystem: {} path: {}", + client.getFileSystem(), + path); - final Hashtable parsedXmsProperties; - final AbfsRestOperation op = client.getPathStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path)); + final Hashtable parsedXmsProperties; + final AbfsRestOperation op = client.getPathStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path)); + perfInfo.registerResult(op.getResult()); - final String xMsProperties = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PROPERTIES); + final String xMsProperties = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PROPERTIES); - parsedXmsProperties = parseCommaSeparatedXmsProperties(xMsProperties); + parsedXmsProperties = parseCommaSeparatedXmsProperties(xMsProperties); - return parsedXmsProperties; + perfInfo.registerSuccess(true); + + return parsedXmsProperties; + } } public void setPathProperties(final Path path, final Hashtable properties) throws AzureBlobFileSystemException { - LOG.debug("setFilesystemProperties for filesystem: {} path: {} with properties: {}", - client.getFileSystem(), - path, - properties); + try (AbfsPerfInfo perfInfo = startTracking("setPathProperties", "setPathProperties")){ + LOG.debug("setFilesystemProperties for filesystem: {} path: {} with properties: {}", + client.getFileSystem(), + path, + properties); - final String commaSeparatedProperties; - try { - commaSeparatedProperties = convertXmsPropertiesToCommaSeparatedString(properties); - } catch (CharacterCodingException ex) { - throw new InvalidAbfsRestOperationException(ex); + final String commaSeparatedProperties; + try { + commaSeparatedProperties = convertXmsPropertiesToCommaSeparatedString(properties); + } catch (CharacterCodingException ex) { + throw new InvalidAbfsRestOperationException(ex); + } + final AbfsRestOperation op = client.setPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), commaSeparatedProperties); + perfInfo.registerResult(op.getResult()).registerSuccess(true); } - client.setPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), commaSeparatedProperties); } public void createFilesystem() throws AzureBlobFileSystemException { - LOG.debug("createFilesystem for filesystem: {}", - client.getFileSystem()); + try (AbfsPerfInfo perfInfo = startTracking("createFilesystem", "createFilesystem")){ + LOG.debug("createFilesystem for filesystem: {}", + client.getFileSystem()); - client.createFilesystem(); + final AbfsRestOperation op = client.createFilesystem(); + perfInfo.registerResult(op.getResult()).registerSuccess(true); + } } public void deleteFilesystem() throws AzureBlobFileSystemException { - LOG.debug("deleteFilesystem for filesystem: {}", - client.getFileSystem()); + try (AbfsPerfInfo perfInfo = startTracking("deleteFilesystem", "deleteFilesystem")) { + LOG.debug("deleteFilesystem for filesystem: {}", + client.getFileSystem()); - client.deleteFilesystem(); + final AbfsRestOperation op = client.deleteFilesystem(); + perfInfo.registerResult(op.getResult()).registerSuccess(true); + } } public OutputStream createFile(final Path path, final boolean overwrite, final FsPermission permission, final FsPermission umask) throws AzureBlobFileSystemException { - boolean isNamespaceEnabled = getIsNamespaceEnabled(); - LOG.debug("createFile filesystem: {} path: {} overwrite: {} permission: {} umask: {} isNamespaceEnabled: {}", - client.getFileSystem(), - path, - overwrite, - permission.toString(), - umask.toString(), - isNamespaceEnabled); + try (AbfsPerfInfo perfInfo = startTracking("createFile", "createPath")) { + boolean isNamespaceEnabled = getIsNamespaceEnabled(); + LOG.debug("createFile filesystem: {} path: {} overwrite: {} permission: {} umask: {} isNamespaceEnabled: {}", + client.getFileSystem(), + path, + overwrite, + permission.toString(), + umask.toString(), + isNamespaceEnabled); - client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), true, overwrite, - isNamespaceEnabled ? getOctalNotation(permission) : null, - isNamespaceEnabled ? getOctalNotation(umask) : null); + final AbfsRestOperation op = client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), true, overwrite, + isNamespaceEnabled ? getOctalNotation(permission) : null, + isNamespaceEnabled ? getOctalNotation(umask) : null); + perfInfo.registerResult(op.getResult()).registerSuccess(true); - return new AbfsOutputStream( - client, - AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), - 0, - abfsConfiguration.getWriteBufferSize(), - abfsConfiguration.isFlushEnabled(), - abfsConfiguration.isOutputStreamFlushDisabled()); + return new AbfsOutputStream( + client, + AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), + 0, + abfsConfiguration.getWriteBufferSize(), + abfsConfiguration.isFlushEnabled(), + abfsConfiguration.isOutputStreamFlushDisabled()); + } } public void createDirectory(final Path path, final FsPermission permission, final FsPermission umask) throws AzureBlobFileSystemException { - boolean isNamespaceEnabled = getIsNamespaceEnabled(); - LOG.debug("createDirectory filesystem: {} path: {} permission: {} umask: {} isNamespaceEnabled: {}", - client.getFileSystem(), - path, - permission, - umask, - isNamespaceEnabled); + try (AbfsPerfInfo perfInfo = startTracking("createDirectory", "createPath")) { + boolean isNamespaceEnabled = getIsNamespaceEnabled(); + LOG.debug("createDirectory filesystem: {} path: {} permission: {} umask: {} isNamespaceEnabled: {}", + client.getFileSystem(), + path, + permission, + umask, + isNamespaceEnabled); - client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), false, true, - isNamespaceEnabled ? getOctalNotation(permission) : null, - isNamespaceEnabled ? getOctalNotation(umask) : null); + final AbfsRestOperation op = client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), false, true, + isNamespaceEnabled ? getOctalNotation(permission) : null, + isNamespaceEnabled ? getOctalNotation(umask) : null); + perfInfo.registerResult(op.getResult()).registerSuccess(true); + } } public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statistics statistics) throws AzureBlobFileSystemException { - LOG.debug("openFileForRead filesystem: {} path: {}", - client.getFileSystem(), - path); + try (AbfsPerfInfo perfInfo = startTracking("openFileForRead", "getPathStatus")) { + LOG.debug("openFileForRead filesystem: {} path: {}", + client.getFileSystem(), + path); - final AbfsRestOperation op = client.getPathStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path)); + final AbfsRestOperation op = client.getPathStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path)); + perfInfo.registerResult(op.getResult()); - final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); - final long contentLength = Long.parseLong(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); - final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); + final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); + final long contentLength = Long.parseLong(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); + final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); - if (parseIsDirectory(resourceType)) { - throw new AbfsRestOperationException( - AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(), - AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(), - "openFileForRead must be used with files and not directories", - null); + if (parseIsDirectory(resourceType)) { + throw new AbfsRestOperationException( + AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(), + AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(), + "openFileForRead must be used with files and not directories", + null); + } + + perfInfo.registerSuccess(true); + + // Add statistics for InputStream + return new AbfsInputStream(client, statistics, + AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength, + abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(), + abfsConfiguration.getTolerateOobAppends(), eTag); } - - // Add statistics for InputStream - return new AbfsInputStream(client, statistics, - AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength, - abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(), - abfsConfiguration.getTolerateOobAppends(), eTag); } public OutputStream openFileForWrite(final Path path, final boolean overwrite) throws AzureBlobFileSystemException { - LOG.debug("openFileForWrite filesystem: {} path: {} overwrite: {}", - client.getFileSystem(), - path, - overwrite); + try (AbfsPerfInfo perfInfo = startTracking("openFileForWrite", "getPathStatus")) { + LOG.debug("openFileForWrite filesystem: {} path: {} overwrite: {}", + client.getFileSystem(), + path, + overwrite); - final AbfsRestOperation op = client.getPathStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path)); + final AbfsRestOperation op = client.getPathStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path)); + perfInfo.registerResult(op.getResult()); - final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); - final Long contentLength = Long.valueOf(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); + final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); + final Long contentLength = Long.valueOf(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); - if (parseIsDirectory(resourceType)) { - throw new AbfsRestOperationException( - AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(), - AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(), - "openFileForRead must be used with files and not directories", - null); + if (parseIsDirectory(resourceType)) { + throw new AbfsRestOperationException( + AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(), + AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(), + "openFileForRead must be used with files and not directories", + null); + } + + final long offset = overwrite ? 0 : contentLength; + + perfInfo.registerSuccess(true); + + return new AbfsOutputStream( + client, + AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), + offset, + abfsConfiguration.getWriteBufferSize(), + abfsConfiguration.isFlushEnabled(), + abfsConfiguration.isOutputStreamFlushDisabled()); } - - final long offset = overwrite ? 0 : contentLength; - - return new AbfsOutputStream( - client, - AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), - offset, - abfsConfiguration.getWriteBufferSize(), - abfsConfiguration.isFlushEnabled(), - abfsConfiguration.isOutputStreamFlushDisabled()); } public void rename(final Path source, final Path destination) throws AzureBlobFileSystemException { + final Instant startAggregate = abfsPerfTracker.getLatencyInstant(); + long countAggregate = 0; + boolean shouldContinue; if (isAtomicRenameKey(source.getName())) { LOG.warn("The atomic rename feature is not supported by the ABFS scheme; however rename," @@ -447,15 +499,28 @@ public class AzureBlobFileSystemStore { String continuation = null; do { - AbfsRestOperation op = client.renamePath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(source), - AbfsHttpConstants.FORWARD_SLASH + getRelativePath(destination), continuation); - continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION); + try (AbfsPerfInfo perfInfo = startTracking("rename", "renamePath")) { + AbfsRestOperation op = client.renamePath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(source), + AbfsHttpConstants.FORWARD_SLASH + getRelativePath(destination), continuation); + perfInfo.registerResult(op.getResult()); + continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION); + perfInfo.registerSuccess(true); + countAggregate++; + shouldContinue = continuation != null && !continuation.isEmpty(); - } while (continuation != null && !continuation.isEmpty()); + if (!shouldContinue) { + perfInfo.registerAggregates(startAggregate, countAggregate); + } + } + } while (shouldContinue); } public void delete(final Path path, final boolean recursive) throws AzureBlobFileSystemException { + final Instant startAggregate = abfsPerfTracker.getLatencyInstant(); + long countAggregate = 0; + boolean shouldContinue = true; + LOG.debug("delete filesystem: {} path: {} recursive: {}", client.getFileSystem(), path, @@ -464,70 +529,89 @@ public class AzureBlobFileSystemStore { String continuation = null; do { - AbfsRestOperation op = client.deletePath( - AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), recursive, continuation); - continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION); + try (AbfsPerfInfo perfInfo = startTracking("delete", "deletePath")) { + AbfsRestOperation op = client.deletePath( + AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), recursive, continuation); + perfInfo.registerResult(op.getResult()); + continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION); + perfInfo.registerSuccess(true); + countAggregate++; + shouldContinue = continuation != null && !continuation.isEmpty(); - } while (continuation != null && !continuation.isEmpty()); + if (!shouldContinue) { + perfInfo.registerAggregates(startAggregate, countAggregate); + } + } + } while (shouldContinue); } public FileStatus getFileStatus(final Path path) throws IOException { - boolean isNamespaceEnabled = getIsNamespaceEnabled(); - LOG.debug("getFileStatus filesystem: {} path: {} isNamespaceEnabled: {}", - client.getFileSystem(), - path, - isNamespaceEnabled); + try (AbfsPerfInfo perfInfo = startTracking("getFileStatus", "undetermined")) { + boolean isNamespaceEnabled = getIsNamespaceEnabled(); + LOG.debug("getFileStatus filesystem: {} path: {} isNamespaceEnabled: {}", + client.getFileSystem(), + path, + isNamespaceEnabled); - final AbfsRestOperation op; - if (path.isRoot()) { - op = isNamespaceEnabled - ? client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + AbfsHttpConstants.ROOT_PATH) - : client.getFilesystemProperties(); - } else { - op = client.getPathStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path)); - } + final AbfsRestOperation op; + if (path.isRoot()) { + if (isNamespaceEnabled) { + perfInfo.registerCallee("getAclStatus"); + op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + AbfsHttpConstants.ROOT_PATH); + } else { + perfInfo.registerCallee("getFilesystemProperties"); + op = client.getFilesystemProperties(); + } + } else { + perfInfo.registerCallee("getPathStatus"); + op = client.getPathStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path)); + } - final long blockSize = abfsConfiguration.getAzureBlockSize(); - final AbfsHttpOperation result = op.getResult(); + perfInfo.registerResult(op.getResult()); + final long blockSize = abfsConfiguration.getAzureBlockSize(); + final AbfsHttpOperation result = op.getResult(); - final String eTag = result.getResponseHeader(HttpHeaderConfigurations.ETAG); - final String lastModified = result.getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED); - final String permissions = result.getResponseHeader((HttpHeaderConfigurations.X_MS_PERMISSIONS)); - final boolean hasAcl = AbfsPermission.isExtendedAcl(permissions); - final long contentLength; - final boolean resourceIsDir; + final String eTag = result.getResponseHeader(HttpHeaderConfigurations.ETAG); + final String lastModified = result.getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED); + final String permissions = result.getResponseHeader((HttpHeaderConfigurations.X_MS_PERMISSIONS)); + final boolean hasAcl = AbfsPermission.isExtendedAcl(permissions); + final long contentLength; + final boolean resourceIsDir; - if (path.isRoot()) { - contentLength = 0; - resourceIsDir = true; - } else { - contentLength = parseContentLength(result.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); - resourceIsDir = parseIsDirectory(result.getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE)); - } + if (path.isRoot()) { + contentLength = 0; + resourceIsDir = true; + } else { + contentLength = parseContentLength(result.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); + resourceIsDir = parseIsDirectory(result.getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE)); + } - final String transformedOwner = identityTransformer.transformIdentityForGetRequest( + final String transformedOwner = identityTransformer.transformIdentityForGetRequest( result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER), true, userName); - final String transformedGroup = identityTransformer.transformIdentityForGetRequest( + final String transformedGroup = identityTransformer.transformIdentityForGetRequest( result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP), false, primaryUserGroup); - return new VersionedFileStatus( - transformedOwner, - transformedGroup, - permissions == null ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL) - : AbfsPermission.valueOf(permissions), - hasAcl, - contentLength, - resourceIsDir, - 1, - blockSize, - parseLastModifiedTime(lastModified), - path, - eTag); + perfInfo.registerSuccess(true); + + return new VersionedFileStatus( + transformedOwner, + transformedGroup, + permissions == null ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL) + : AbfsPermission.valueOf(permissions), + hasAcl, + contentLength, + resourceIsDir, + 1, + blockSize, + parseLastModifiedTime(lastModified), + path, + eTag); + } } /** @@ -551,6 +635,10 @@ public class AzureBlobFileSystemStore { * */ @InterfaceStability.Unstable public FileStatus[] listStatus(final Path path, final String startFrom) throws IOException { + final Instant startAggregate = abfsPerfTracker.getLatencyInstant(); + long countAggregate = 0; + boolean shouldContinue = true; + LOG.debug("listStatus filesystem: {} path: {}, startFrom: {}", client.getFileSystem(), path, @@ -568,53 +656,63 @@ public class AzureBlobFileSystemStore { ArrayList fileStatuses = new ArrayList<>(); do { - AbfsRestOperation op = client.listPath(relativePath, false, LIST_MAX_RESULTS, continuation); - continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION); - ListResultSchema retrievedSchema = op.getResult().getListResultSchema(); - if (retrievedSchema == null) { - throw new AbfsRestOperationException( - AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(), - AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(), - "listStatusAsync path not found", - null, op.getResult()); - } - - long blockSize = abfsConfiguration.getAzureBlockSize(); - - for (ListResultEntrySchema entry : retrievedSchema.paths()) { - final String owner = identityTransformer.transformIdentityForGetRequest(entry.owner(), true, userName); - final String group = identityTransformer.transformIdentityForGetRequest(entry.group(), false, primaryUserGroup); - final FsPermission fsPermission = entry.permissions() == null - ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL) - : AbfsPermission.valueOf(entry.permissions()); - final boolean hasAcl = AbfsPermission.isExtendedAcl(entry.permissions()); - - long lastModifiedMillis = 0; - long contentLength = entry.contentLength() == null ? 0 : entry.contentLength(); - boolean isDirectory = entry.isDirectory() == null ? false : entry.isDirectory(); - if (entry.lastModified() != null && !entry.lastModified().isEmpty()) { - lastModifiedMillis = parseLastModifiedTime(entry.lastModified()); + try (AbfsPerfInfo perfInfo = startTracking("listStatus", "listPath")) { + AbfsRestOperation op = client.listPath(relativePath, false, LIST_MAX_RESULTS, continuation); + perfInfo.registerResult(op.getResult()); + continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION); + ListResultSchema retrievedSchema = op.getResult().getListResultSchema(); + if (retrievedSchema == null) { + throw new AbfsRestOperationException( + AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(), + AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(), + "listStatusAsync path not found", + null, op.getResult()); } - Path entryPath = new Path(File.separator + entry.name()); - entryPath = entryPath.makeQualified(this.uri, entryPath); + long blockSize = abfsConfiguration.getAzureBlockSize(); - fileStatuses.add( - new VersionedFileStatus( - owner, - group, - fsPermission, - hasAcl, - contentLength, - isDirectory, - 1, - blockSize, - lastModifiedMillis, - entryPath, - entry.eTag())); + for (ListResultEntrySchema entry : retrievedSchema.paths()) { + final String owner = identityTransformer.transformIdentityForGetRequest(entry.owner(), true, userName); + final String group = identityTransformer.transformIdentityForGetRequest(entry.group(), false, primaryUserGroup); + final FsPermission fsPermission = entry.permissions() == null + ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL) + : AbfsPermission.valueOf(entry.permissions()); + final boolean hasAcl = AbfsPermission.isExtendedAcl(entry.permissions()); + + long lastModifiedMillis = 0; + long contentLength = entry.contentLength() == null ? 0 : entry.contentLength(); + boolean isDirectory = entry.isDirectory() == null ? false : entry.isDirectory(); + if (entry.lastModified() != null && !entry.lastModified().isEmpty()) { + lastModifiedMillis = parseLastModifiedTime(entry.lastModified()); + } + + Path entryPath = new Path(File.separator + entry.name()); + entryPath = entryPath.makeQualified(this.uri, entryPath); + + fileStatuses.add( + new VersionedFileStatus( + owner, + group, + fsPermission, + hasAcl, + contentLength, + isDirectory, + 1, + blockSize, + lastModifiedMillis, + entryPath, + entry.eTag())); + } + + perfInfo.registerSuccess(true); + countAggregate++; + shouldContinue = continuation != null && !continuation.isEmpty(); + + if (!shouldContinue) { + perfInfo.registerAggregates(startAggregate, countAggregate); + } } - - } while (continuation != null && !continuation.isEmpty()); + } while (shouldContinue); return fileStatuses.toArray(new FileStatus[fileStatuses.size()]); } @@ -681,17 +779,25 @@ public class AzureBlobFileSystemStore { "This operation is only valid for storage accounts with the hierarchical namespace enabled."); } - LOG.debug( - "setOwner filesystem: {} path: {} owner: {} group: {}", - client.getFileSystem(), - path.toString(), - owner, - group); + try (AbfsPerfInfo perfInfo = startTracking("setOwner", "setOwner")) { - final String transformedOwner = identityTransformer.transformUserOrGroupForSetRequest(owner); - final String transformedGroup = identityTransformer.transformUserOrGroupForSetRequest(group); + LOG.debug( + "setOwner filesystem: {} path: {} owner: {} group: {}", + client.getFileSystem(), + path.toString(), + owner, + group); - client.setOwner(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), transformedOwner, transformedGroup); + final String transformedOwner = identityTransformer.transformUserOrGroupForSetRequest(owner); + final String transformedGroup = identityTransformer.transformUserOrGroupForSetRequest(group); + + final AbfsRestOperation op = client.setOwner( + AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), + transformedOwner, + transformedGroup); + + perfInfo.registerResult(op.getResult()).registerSuccess(true); + } } public void setPermission(final Path path, final FsPermission permission) throws @@ -701,13 +807,20 @@ public class AzureBlobFileSystemStore { "This operation is only valid for storage accounts with the hierarchical namespace enabled."); } - LOG.debug( - "setPermission filesystem: {} path: {} permission: {}", - client.getFileSystem(), - path.toString(), - permission.toString()); - client.setPermission(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), - String.format(AbfsHttpConstants.PERMISSION_FORMAT, permission.toOctal())); + try (AbfsPerfInfo perfInfo = startTracking("setPermission", "setPermission")) { + + LOG.debug( + "setPermission filesystem: {} path: {} permission: {}", + client.getFileSystem(), + path.toString(), + permission.toString()); + + final AbfsRestOperation op = client.setPermission( + AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), + String.format(AbfsHttpConstants.PERMISSION_FORMAT, permission.toOctal())); + + perfInfo.registerResult(op.getResult()).registerSuccess(true); + } } public void modifyAclEntries(final Path path, final List aclSpec) throws @@ -717,25 +830,37 @@ public class AzureBlobFileSystemStore { "This operation is only valid for storage accounts with the hierarchical namespace enabled."); } - LOG.debug( - "modifyAclEntries filesystem: {} path: {} aclSpec: {}", - client.getFileSystem(), - path.toString(), - AclEntry.aclSpecToString(aclSpec)); + try (AbfsPerfInfo perfInfoGet = startTracking("modifyAclEntries", "getAclStatus")) { - identityTransformer.transformAclEntriesForSetRequest(aclSpec); - final Map modifyAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec)); - boolean useUpn = AbfsAclHelper.isUpnFormatAclEntries(modifyAclEntries); + LOG.debug( + "modifyAclEntries filesystem: {} path: {} aclSpec: {}", + client.getFileSystem(), + path.toString(), + AclEntry.aclSpecToString(aclSpec)); - final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), useUpn); - final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); + identityTransformer.transformAclEntriesForSetRequest(aclSpec); + final Map modifyAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec)); + boolean useUpn = AbfsAclHelper.isUpnFormatAclEntries(modifyAclEntries); - final Map aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL)); + final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), useUpn); + perfInfoGet.registerResult(op.getResult()); + final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); - AbfsAclHelper.modifyAclEntriesInternal(aclEntries, modifyAclEntries); + final Map aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL)); - client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), - AbfsAclHelper.serializeAclSpec(aclEntries), eTag); + AbfsAclHelper.modifyAclEntriesInternal(aclEntries, modifyAclEntries); + + perfInfoGet.registerSuccess(true).finishTracking(); + + try (AbfsPerfInfo perfInfoSet = startTracking("modifyAclEntries", "setAcl")) { + final AbfsRestOperation setAclOp + = client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), + AbfsAclHelper.serializeAclSpec(aclEntries), eTag); + perfInfoSet.registerResult(setAclOp.getResult()) + .registerSuccess(true) + .registerAggregates(perfInfoGet.getTrackingStart(), GET_SET_AGGREGATE_COUNT); + } + } } public void removeAclEntries(final Path path, final List aclSpec) throws AzureBlobFileSystemException { @@ -744,25 +869,37 @@ public class AzureBlobFileSystemStore { "This operation is only valid for storage accounts with the hierarchical namespace enabled."); } - LOG.debug( - "removeAclEntries filesystem: {} path: {} aclSpec: {}", - client.getFileSystem(), - path.toString(), - AclEntry.aclSpecToString(aclSpec)); + try (AbfsPerfInfo perfInfoGet = startTracking("removeAclEntries", "getAclStatus")) { - identityTransformer.transformAclEntriesForSetRequest(aclSpec); - final Map removeAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec)); - boolean isUpnFormat = AbfsAclHelper.isUpnFormatAclEntries(removeAclEntries); + LOG.debug( + "removeAclEntries filesystem: {} path: {} aclSpec: {}", + client.getFileSystem(), + path.toString(), + AclEntry.aclSpecToString(aclSpec)); - final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), isUpnFormat); - final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); + identityTransformer.transformAclEntriesForSetRequest(aclSpec); + final Map removeAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec)); + boolean isUpnFormat = AbfsAclHelper.isUpnFormatAclEntries(removeAclEntries); - final Map aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL)); + final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), isUpnFormat); + perfInfoGet.registerResult(op.getResult()); + final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); - AbfsAclHelper.removeAclEntriesInternal(aclEntries, removeAclEntries); + final Map aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL)); - client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), - AbfsAclHelper.serializeAclSpec(aclEntries), eTag); + AbfsAclHelper.removeAclEntriesInternal(aclEntries, removeAclEntries); + + perfInfoGet.registerSuccess(true).finishTracking(); + + try (AbfsPerfInfo perfInfoSet = startTracking("removeAclEntries", "setAcl")) { + final AbfsRestOperation setAclOp = + client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), + AbfsAclHelper.serializeAclSpec(aclEntries), eTag); + perfInfoSet.registerResult(setAclOp.getResult()) + .registerSuccess(true) + .registerAggregates(perfInfoGet.getTrackingStart(), GET_SET_AGGREGATE_COUNT); + } + } } public void removeDefaultAcl(final Path path) throws AzureBlobFileSystemException { @@ -771,26 +908,38 @@ public class AzureBlobFileSystemStore { "This operation is only valid for storage accounts with the hierarchical namespace enabled."); } - LOG.debug( - "removeDefaultAcl filesystem: {} path: {}", - client.getFileSystem(), - path.toString()); + try (AbfsPerfInfo perfInfoGet = startTracking("removeDefaultAcl", "getAclStatus")) { - final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true)); - final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); - final Map aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL)); - final Map defaultAclEntries = new HashMap<>(); + LOG.debug( + "removeDefaultAcl filesystem: {} path: {}", + client.getFileSystem(), + path.toString()); - for (Map.Entry aclEntry : aclEntries.entrySet()) { - if (aclEntry.getKey().startsWith("default:")) { - defaultAclEntries.put(aclEntry.getKey(), aclEntry.getValue()); + final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true)); + perfInfoGet.registerResult(op.getResult()); + final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); + final Map aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL)); + final Map defaultAclEntries = new HashMap<>(); + + for (Map.Entry aclEntry : aclEntries.entrySet()) { + if (aclEntry.getKey().startsWith("default:")) { + defaultAclEntries.put(aclEntry.getKey(), aclEntry.getValue()); + } + } + + aclEntries.keySet().removeAll(defaultAclEntries.keySet()); + + perfInfoGet.registerSuccess(true).finishTracking(); + + try (AbfsPerfInfo perfInfoSet = startTracking("removeDefaultAcl", "setAcl")) { + final AbfsRestOperation setAclOp = + client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), + AbfsAclHelper.serializeAclSpec(aclEntries), eTag); + perfInfoSet.registerResult(setAclOp.getResult()) + .registerSuccess(true) + .registerAggregates(perfInfoGet.getTrackingStart(), GET_SET_AGGREGATE_COUNT); } } - - aclEntries.keySet().removeAll(defaultAclEntries.keySet()); - - client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), - AbfsAclHelper.serializeAclSpec(aclEntries), eTag); } public void removeAcl(final Path path) throws AzureBlobFileSystemException { @@ -799,22 +948,35 @@ public class AzureBlobFileSystemStore { "This operation is only valid for storage accounts with the hierarchical namespace enabled."); } - LOG.debug( - "removeAcl filesystem: {} path: {}", - client.getFileSystem(), - path.toString()); - final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true)); - final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); + try (AbfsPerfInfo perfInfoGet = startTracking("removeAcl", "getAclStatus")){ - final Map aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL)); - final Map newAclEntries = new HashMap<>(); + LOG.debug( + "removeAcl filesystem: {} path: {}", + client.getFileSystem(), + path.toString()); - newAclEntries.put(AbfsHttpConstants.ACCESS_USER, aclEntries.get(AbfsHttpConstants.ACCESS_USER)); - newAclEntries.put(AbfsHttpConstants.ACCESS_GROUP, aclEntries.get(AbfsHttpConstants.ACCESS_GROUP)); - newAclEntries.put(AbfsHttpConstants.ACCESS_OTHER, aclEntries.get(AbfsHttpConstants.ACCESS_OTHER)); + final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true)); + perfInfoGet.registerResult(op.getResult()); + final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); - client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), - AbfsAclHelper.serializeAclSpec(newAclEntries), eTag); + final Map aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL)); + final Map newAclEntries = new HashMap<>(); + + newAclEntries.put(AbfsHttpConstants.ACCESS_USER, aclEntries.get(AbfsHttpConstants.ACCESS_USER)); + newAclEntries.put(AbfsHttpConstants.ACCESS_GROUP, aclEntries.get(AbfsHttpConstants.ACCESS_GROUP)); + newAclEntries.put(AbfsHttpConstants.ACCESS_OTHER, aclEntries.get(AbfsHttpConstants.ACCESS_OTHER)); + + perfInfoGet.registerSuccess(true).finishTracking(); + + try (AbfsPerfInfo perfInfoSet = startTracking("removeAcl", "setAcl")) { + final AbfsRestOperation setAclOp = + client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), + AbfsAclHelper.serializeAclSpec(newAclEntries), eTag); + perfInfoSet.registerResult(setAclOp.getResult()) + .registerSuccess(true) + .registerAggregates(perfInfoGet.getTrackingStart(), GET_SET_AGGREGATE_COUNT); + } + } } public void setAcl(final Path path, final List aclSpec) throws AzureBlobFileSystemException { @@ -823,25 +985,37 @@ public class AzureBlobFileSystemStore { "This operation is only valid for storage accounts with the hierarchical namespace enabled."); } - LOG.debug( - "setAcl filesystem: {} path: {} aclspec: {}", - client.getFileSystem(), - path.toString(), - AclEntry.aclSpecToString(aclSpec)); + try (AbfsPerfInfo perfInfoGet = startTracking("setAcl", "getAclStatus")) { - identityTransformer.transformAclEntriesForSetRequest(aclSpec); - final Map aclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec)); - final boolean isUpnFormat = AbfsAclHelper.isUpnFormatAclEntries(aclEntries); + LOG.debug( + "setAcl filesystem: {} path: {} aclspec: {}", + client.getFileSystem(), + path.toString(), + AclEntry.aclSpecToString(aclSpec)); - final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), isUpnFormat); - final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); + identityTransformer.transformAclEntriesForSetRequest(aclSpec); + final Map aclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec)); + final boolean isUpnFormat = AbfsAclHelper.isUpnFormatAclEntries(aclEntries); - final Map getAclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL)); + final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), isUpnFormat); + perfInfoGet.registerResult(op.getResult()); + final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); - AbfsAclHelper.setAclEntriesInternal(aclEntries, getAclEntries); + final Map getAclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL)); - client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), - AbfsAclHelper.serializeAclSpec(aclEntries), eTag); + AbfsAclHelper.setAclEntriesInternal(aclEntries, getAclEntries); + + perfInfoGet.registerSuccess(true).finishTracking(); + + try (AbfsPerfInfo perfInfoSet = startTracking("setAcl", "setAcl")) { + final AbfsRestOperation setAclOp = + client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), + AbfsAclHelper.serializeAclSpec(aclEntries), eTag); + perfInfoSet.registerResult(setAclOp.getResult()) + .registerSuccess(true) + .registerAggregates(perfInfoGet.getTrackingStart(), GET_SET_AGGREGATE_COUNT); + } + } } public AclStatus getAclStatus(final Path path) throws IOException { @@ -850,38 +1024,44 @@ public class AzureBlobFileSystemStore { "This operation is only valid for storage accounts with the hierarchical namespace enabled."); } - LOG.debug( - "getAclStatus filesystem: {} path: {}", - client.getFileSystem(), - path.toString()); - AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true)); - AbfsHttpOperation result = op.getResult(); + try (AbfsPerfInfo perfInfo = startTracking("getAclStatus", "getAclStatus")) { - final String transformedOwner = identityTransformer.transformIdentityForGetRequest( - result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER), - true, - userName); - final String transformedGroup = identityTransformer.transformIdentityForGetRequest( - result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP), - false, - primaryUserGroup); + LOG.debug( + "getAclStatus filesystem: {} path: {}", + client.getFileSystem(), + path.toString()); - final String permissions = result.getResponseHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS); - final String aclSpecString = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL); + AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true)); + AbfsHttpOperation result = op.getResult(); + perfInfo.registerResult(result); - final List aclEntries = AclEntry.parseAclSpec(AbfsAclHelper.processAclString(aclSpecString), true); - identityTransformer.transformAclEntriesForGetRequest(aclEntries, userName, primaryUserGroup); - final FsPermission fsPermission = permissions == null ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL) - : AbfsPermission.valueOf(permissions); + final String transformedOwner = identityTransformer.transformIdentityForGetRequest( + result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER), + true, + userName); + final String transformedGroup = identityTransformer.transformIdentityForGetRequest( + result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP), + false, + primaryUserGroup); - final AclStatus.Builder aclStatusBuilder = new AclStatus.Builder(); - aclStatusBuilder.owner(transformedOwner); - aclStatusBuilder.group(transformedGroup); + final String permissions = result.getResponseHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS); + final String aclSpecString = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL); - aclStatusBuilder.setPermission(fsPermission); - aclStatusBuilder.stickyBit(fsPermission.getStickyBit()); - aclStatusBuilder.addEntries(aclEntries); - return aclStatusBuilder.build(); + final List aclEntries = AclEntry.parseAclSpec(AbfsAclHelper.processAclString(aclSpecString), true); + identityTransformer.transformAclEntriesForGetRequest(aclEntries, userName, primaryUserGroup); + final FsPermission fsPermission = permissions == null ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL) + : AbfsPermission.valueOf(permissions); + + final AclStatus.Builder aclStatusBuilder = new AclStatus.Builder(); + aclStatusBuilder.owner(transformedOwner); + aclStatusBuilder.group(transformedGroup); + + aclStatusBuilder.setPermission(fsPermission); + aclStatusBuilder.stickyBit(fsPermission.getStickyBit()); + aclStatusBuilder.addEntries(aclEntries); + perfInfo.registerSuccess(true); + return aclStatusBuilder.build(); + } } public boolean isAtomicRenameKey(String key) { @@ -919,7 +1099,7 @@ public class AzureBlobFileSystemStore { tokenProvider = abfsConfiguration.getTokenProvider(); } - this.client = new AbfsClient(baseUrl, creds, abfsConfiguration, new ExponentialRetryPolicy(), tokenProvider); + this.client = new AbfsClient(baseUrl, creds, abfsConfiguration, new ExponentialRetryPolicy(), tokenProvider, abfsPerfTracker); } private String getOctalNotation(FsPermission fsPermission) { @@ -1060,6 +1240,10 @@ public class AzureBlobFileSystemStore { return false; } + private AbfsPerfInfo startTracking(String callerName, String calleeName) { + return new AbfsPerfInfo(abfsPerfTracker, callerName, calleeName); + } + private static class VersionedFileStatus extends FileStatus { private final String version; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index eb4605b1dfd..409ffc3c124 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -114,6 +114,8 @@ public final class ConfigurationKeys { public static final String FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN = "fs.azure.account.oauth2.refresh.token"; /** Key for oauth AAD refresh token endpoint: {@value}. */ public static final String FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN_ENDPOINT = "fs.azure.account.oauth2.refresh.token.endpoint"; + /** Key for enabling the tracking of ABFS API latency and sending the latency numbers to the ABFS API service */ + public static final String FS_AZURE_ABFS_LATENCY_TRACK = "fs.azure.abfs.latency.track"; public static String accountProperty(String property, String account) { return property + "." + account; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 29367eb088e..71e0ed627d8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -67,6 +67,7 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_ENABLE_HTTPS = true; public static final boolean DEFAULT_USE_UPN = false; + public static final boolean DEFAULT_ABFS_LATENCY_TRACK = false; private FileSystemConfigurations() {} } \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java index c8d43904deb..79bba094f0e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java @@ -58,6 +58,7 @@ public final class HttpHeaderConfigurations { public static final String X_MS_PERMISSIONS = "x-ms-permissions"; public static final String X_MS_UMASK = "x-ms-umask"; public static final String X_MS_NAMESPACE_ENABLED = "x-ms-namespace-enabled"; + public static final String X_MS_ABFS_CLIENT_LATENCY = "x-ms-abfs-client-latency"; private HttpHeaderConfigurations() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AbfsPerfLoggable.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AbfsPerfLoggable.java new file mode 100644 index 00000000000..772f006182b --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AbfsPerfLoggable.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.contracts.services; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * The AbfsPerfLoggable contract. + */ +@InterfaceStability.Evolving +public interface AbfsPerfLoggable { + /** + * Gets the string to log to the Abfs Logging API. + * + * @return the string that will be logged. + */ + String getLogString(); +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 0b9ad7a6f79..d5b33964910 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -57,6 +57,7 @@ public class AbfsClient { private final String filesystem; private final AbfsConfiguration abfsConfiguration; private final String userAgent; + private final AbfsPerfTracker abfsPerfTracker; private final AccessTokenProvider tokenProvider; @@ -64,7 +65,8 @@ public class AbfsClient { public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials, final AbfsConfiguration abfsConfiguration, final ExponentialRetryPolicy exponentialRetryPolicy, - final AccessTokenProvider tokenProvider) { + final AccessTokenProvider tokenProvider, + final AbfsPerfTracker abfsPerfTracker) { this.baseUrl = baseUrl; this.sharedKeyCredentials = sharedKeyCredentials; String baseUrlString = baseUrl.toString(); @@ -85,12 +87,17 @@ public class AbfsClient { this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName); this.tokenProvider = tokenProvider; + this.abfsPerfTracker = abfsPerfTracker; } public String getFileSystem() { return filesystem; } + protected AbfsPerfTracker getAbfsPerfTracker() { + return abfsPerfTracker; + } + ExponentialRetryPolicy getRetryPolicy() { return retryPolicy; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java index de38b347248..133862ba209 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java @@ -21,8 +21,10 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.UnsupportedEncodingException; import java.net.HttpURLConnection; import java.net.URL; +import java.net.URLEncoder; import java.util.List; import java.util.UUID; @@ -40,12 +42,13 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; +import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable; import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; /** * Represents an HTTP operation. */ -public class AbfsHttpOperation { +public class AbfsHttpOperation implements AbfsPerfLoggable { private static final Logger LOG = LoggerFactory.getLogger(AbfsHttpOperation.class); private static final int CONNECT_TIMEOUT = 30 * 1000; @@ -161,6 +164,47 @@ public class AbfsHttpOperation { return sb.toString(); } + // Returns a trace message for the ABFS API logging service to consume + public String getLogString() { + String urlStr = null; + + try { + urlStr = URLEncoder.encode(url.toString(), "UTF-8"); + } catch(UnsupportedEncodingException e) { + urlStr = "https%3A%2F%2Ffailed%2Fto%2Fencode%2Furl"; + } + + final StringBuilder sb = new StringBuilder(); + sb.append("s=") + .append(statusCode) + .append(" e=") + .append(storageErrorCode) + .append(" ci=") + .append(clientRequestId) + .append(" ri=") + .append(requestId); + + if (isTraceEnabled) { + sb.append(" ct=") + .append(connectionTimeMs) + .append(" st=") + .append(sendRequestTimeMs) + .append(" rt=") + .append(recvResponseTimeMs); + } + + sb.append(" bs=") + .append(bytesSent) + .append(" br=") + .append(bytesReceived) + .append(" m=") + .append(method) + .append(" u=") + .append(urlStr); + + return sb.toString(); + } + /** * Initializes a new HTTP request and opens the connection. * diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index fe48cb93237..1f343424fbf 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -226,8 +226,10 @@ public class AbfsInputStream extends FSInputStream { throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer"); } final AbfsRestOperation op; - try { + AbfsPerfTracker tracker = client.getAbfsPerfTracker(); + try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) { op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag); + perfInfo.registerResult(op.getResult()).registerSuccess(true); } catch (AzureBlobFileSystemException ex) { if (ex instanceof AbfsRestOperationException) { AbfsRestOperationException ere = (AbfsRestOperationException) ex; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index fd56eb0a015..2d409416e86 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -289,10 +289,16 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa final Future job = completionService.submit(new Callable() { @Override public Void call() throws Exception { - client.append(path, offset, bytes, 0, - bytesLength); - byteBufferPool.putBuffer(ByteBuffer.wrap(bytes)); - return null; + AbfsPerfTracker tracker = client.getAbfsPerfTracker(); + try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, + "writeCurrentBufferToService", "append")) { + AbfsRestOperation op = client.append(path, offset, bytes, 0, + bytesLength); + perfInfo.registerResult(op.getResult()); + byteBufferPool.putBuffer(ByteBuffer.wrap(bytes)); + perfInfo.registerSuccess(true); + return null; + } } }); @@ -334,8 +340,11 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa private synchronized void flushWrittenBytesToServiceInternal(final long offset, final boolean retainUncommitedData, final boolean isClose) throws IOException { - try { - client.flush(path, offset, retainUncommitedData, isClose); + AbfsPerfTracker tracker = client.getAbfsPerfTracker(); + try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, + "flushWrittenBytesToServiceInternal", "flush")) { + AbfsRestOperation op = client.flush(path, offset, retainUncommitedData, isClose); + perfInfo.registerResult(op.getResult()).registerSuccess(true); } catch (AzureBlobFileSystemException ex) { if (ex instanceof AbfsRestOperationException) { if (((AbfsRestOperationException) ex).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsPerfInfo.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsPerfInfo.java new file mode 100644 index 00000000000..0e7a111480c --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsPerfInfo.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.time.Instant; + +import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable; + +/** + * {@code AbfsPerfInfo} holds information on ADLS Gen 2 API performance observed by {@code AbfsClient}. Every + * Abfs request keeps adding its information (success/failure, latency etc) to its {@code AbfsPerfInfo}'s object + * as and when it becomes available. When the request is over, the performance information is recorded while + * the {@code AutoCloseable} {@code AbfsPerfInfo} object is "closed". + */ +public final class AbfsPerfInfo implements AutoCloseable { + + // the tracker which will be extracting perf info out of this object. + private AbfsPerfTracker abfsPerfTracker; + + // the caller name. + private String callerName; + + // the callee name. + private String calleeName; + + // time when this tracking started. + private Instant trackingStart; + + // time when this tracking ended. + private Instant trackingEnd; + + // whether the tracked request was successful. + private boolean success; + + // time when the aggregate operation (to which this request belongs) started. + private Instant aggregateStart; + + // number of requests in the aggregate operation (to which this request belongs). + private long aggregateCount; + + // result of the request. + private AbfsPerfLoggable res; + + public AbfsPerfInfo(AbfsPerfTracker abfsPerfTracker, String callerName, String calleeName) { + this.callerName = callerName; + this.calleeName = calleeName; + this.abfsPerfTracker = abfsPerfTracker; + this.success = false; + this.trackingStart = abfsPerfTracker.getLatencyInstant(); + } + + public AbfsPerfInfo registerSuccess(boolean success) { + this.success = success; + return this; + } + + public AbfsPerfInfo registerResult(AbfsPerfLoggable res) { + this.res = res; + return this; + } + + public AbfsPerfInfo registerAggregates(Instant aggregateStart, long aggregateCount) { + this.aggregateStart = aggregateStart; + this.aggregateCount = aggregateCount; + return this; + } + + public AbfsPerfInfo finishTracking() { + if (this.trackingEnd == null) { + this.trackingEnd = abfsPerfTracker.getLatencyInstant(); + } + + return this; + } + + public AbfsPerfInfo registerCallee(String calleeName) { + this.calleeName = calleeName; + return this; + } + + @Override + public void close() { + abfsPerfTracker.trackInfo(this.finishTracking()); + } + + public String getCallerName() { + return callerName; + }; + + public String getCalleeName() { + return calleeName; + } + + public Instant getTrackingStart() { + return trackingStart; + } + + public Instant getTrackingEnd() { + return trackingEnd; + } + + public boolean getSuccess() { + return success; + } + + public Instant getAggregateStart() { + return aggregateStart; + } + + public long getAggregateCount() { + return aggregateCount; + } + + public AbfsPerfLoggable getResult() { + return res; + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsPerfTracker.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsPerfTracker.java new file mode 100644 index 00000000000..e24c47b8c7a --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsPerfTracker.java @@ -0,0 +1,319 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; +import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable; + +/** + * {@code AbfsPerfTracker} keeps track of service latencies observed by {@code AbfsClient}. Every request hands over + * its perf-related information as a {@code AbfsPerfInfo} object (contains success/failure, latency etc) to the + * {@code AbfsPerfTracker}'s queue. When a request is made, we check {@code AbfsPerfTracker} to see if there are + * any latency numbers to be reported. If there are any, the stats are added to an HTTP header + * ({@code x-ms-abfs-client-latency}) on the next request. + * + * A typical perf log line appears like: + * + * h=KARMA t=2019-10-25T20:21:14.518Z a=abfstest01.dfs.core.windows.net + * c=abfs-testcontainer-84828169-6488-4a62-a875-1e674275a29f cr=delete ce=deletePath r=Succeeded l=32 ls=32 lc=1 s=200 + * e= ci=95121dae-70a8-4187-b067-614091034558 ri=97effdcf-201f-0097-2d71-8bae00000000 ct=0 st=0 rt=0 bs=0 br=0 m=DELETE + * u=https%3A%2F%2Fabfstest01.dfs.core.windows.net%2Fabfs-testcontainer%2Ftest%3Ftimeout%3D90%26recursive%3Dtrue + * + * The fields have the following definitions: + * + * h: host name + * t: time when this request was logged + * a: Azure storage account name + * c: container name + * cr: name of the caller method + * ce: name of the callee method + * r: result (Succeeded/Failed) + * l: latency (time spent in callee) + * ls: latency sum (aggregate time spent in caller; logged when there are multiple callees; + * logged with the last callee) + * lc: latency count (number of callees; logged when there are multiple callees; + * logged with the last callee) + * s: HTTP Status code + * e: Error code + * ci: client request ID + * ri: server request ID + * ct: connection time in milliseconds + * st: sending time in milliseconds + * rt: receiving time in milliseconds + * bs: bytes sent + * br: bytes received + * m: HTTP method (GET, PUT etc) + * u: Encoded HTTP URL + * + */ +public final class AbfsPerfTracker { + + // the logger. + private static final Logger LOG = LoggerFactory.getLogger(AbfsPerfTracker.class); + + // the field names of perf log lines. + private static final String HOST_NAME_KEY = "h"; + private static final String TIMESTAMP_KEY = "t"; + private static final String STORAGE_ACCOUNT_NAME_KEY = "a"; + private static final String CONTAINER_NAME_KEY = "c"; + private static final String CALLER_METHOD_NAME_KEY = "cr"; + private static final String CALLEE_METHOD_NAME_KEY = "ce"; + private static final String RESULT_KEY = "r"; + private static final String LATENCY_KEY = "l"; + private static final String LATENCY_SUM_KEY = "ls"; + private static final String LATENCY_COUNT_KEY = "lc"; + private static final String HTTP_STATUS_CODE_KEY = "s"; + private static final String ERROR_CODE_KEY = "e"; + private static final String CLIENT_REQUEST_ID_KEY = "ci"; + private static final String SERVER_REQUEST_ID_KEY = "ri"; + private static final String CONNECTION_TIME_KEY = "ct"; + private static final String SENDING_TIME_KEY = "st"; + private static final String RECEIVING_TIME_KEY = "rt"; + private static final String BYTES_SENT_KEY = "bs"; + private static final String BYTES_RECEIVED_KEY = "br"; + private static final String HTTP_METHOD_KEY = "m"; + private static final String HTTP_URL_KEY = "u"; + private static final String STRING_PLACEHOLDER = "%s"; + + // the queue to hold latency information. + private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); + + // whether the latency tracker has been enabled. + private boolean enabled = false; + + // the host name. + private String hostName; + + // singleton latency reporting format. + private String singletonLatencyReportingFormat; + + // aggregate latency reporting format. + private String aggregateLatencyReportingFormat; + + public AbfsPerfTracker(String filesystemName, String accountName, AbfsConfiguration configuration) { + this(filesystemName, accountName, configuration.shouldTrackLatency()); + } + + protected AbfsPerfTracker(String filesystemName, String accountName, boolean enabled) { + this.enabled = enabled; + + LOG.debug("AbfsPerfTracker configuration: {}", enabled); + + if (enabled) { + try { + hostName = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + hostName = "UnknownHost"; + } + + String commonReportingFormat = new StringBuilder() + .append(HOST_NAME_KEY) + .append(AbfsHttpConstants.EQUAL) + .append(hostName) + .append(AbfsHttpConstants.SINGLE_WHITE_SPACE) + .append(TIMESTAMP_KEY) + .append(AbfsHttpConstants.EQUAL) + .append(STRING_PLACEHOLDER) + .append(AbfsHttpConstants.SINGLE_WHITE_SPACE) + .append(STORAGE_ACCOUNT_NAME_KEY) + .append(AbfsHttpConstants.EQUAL) + .append(accountName) + .append(AbfsHttpConstants.SINGLE_WHITE_SPACE) + .append(CONTAINER_NAME_KEY) + .append(AbfsHttpConstants.EQUAL) + .append(filesystemName) + .append(AbfsHttpConstants.SINGLE_WHITE_SPACE) + .append(CALLER_METHOD_NAME_KEY) + .append(AbfsHttpConstants.EQUAL) + .append(STRING_PLACEHOLDER) + .append(AbfsHttpConstants.SINGLE_WHITE_SPACE) + .append(CALLEE_METHOD_NAME_KEY) + .append(AbfsHttpConstants.EQUAL) + .append(STRING_PLACEHOLDER) + .append(AbfsHttpConstants.SINGLE_WHITE_SPACE) + .append(RESULT_KEY) + .append(AbfsHttpConstants.EQUAL) + .append(STRING_PLACEHOLDER) + .append(AbfsHttpConstants.SINGLE_WHITE_SPACE) + .append(LATENCY_KEY) + .append(AbfsHttpConstants.EQUAL) + .append(STRING_PLACEHOLDER) + .toString(); + + /** + * Example singleton log (no ls or lc field) + * h=KARMA t=2019-10-25T20:21:14.518Z a=abfstest01.dfs.core.windows.net + * c=abfs-testcontainer-84828169-6488-4a62-a875-1e674275a29f cr=delete ce=deletePath r=Succeeded l=32 s=200 + * e= ci=95121dae-70a8-4187-b067-614091034558 ri=97effdcf-201f-0097-2d71-8bae00000000 ct=0 st=0 rt=0 bs=0 br=0 m=DELETE + * u=https%3A%2F%2Fabfstest01.dfs.core.windows.net%2Fabfs-testcontainer%2Ftest%3Ftimeout%3D90%26recursive%3Dtrue + */ + singletonLatencyReportingFormat = new StringBuilder() + .append(commonReportingFormat) + .append(STRING_PLACEHOLDER) + .toString(); + + /** + * Example aggregate log + * h=KARMA t=2019-10-25T20:21:14.518Z a=abfstest01.dfs.core.windows.net + * c=abfs-testcontainer-84828169-6488-4a62-a875-1e674275a29f cr=delete ce=deletePath r=Succeeded l=32 ls=32 lc=1 s=200 + * e= ci=95121dae-70a8-4187-b067-614091034558 ri=97effdcf-201f-0097-2d71-8bae00000000 ct=0 st=0 rt=0 bs=0 br=0 m=DELETE + * u=https%3A%2F%2Fabfstest01.dfs.core.windows.net%2Fabfs-testcontainer%2Ftest%3Ftimeout%3D90%26recursive%3Dtrue + */ + aggregateLatencyReportingFormat = new StringBuilder() + .append(commonReportingFormat) + .append(AbfsHttpConstants.SINGLE_WHITE_SPACE) + .append(LATENCY_SUM_KEY) + .append(AbfsHttpConstants.EQUAL) + .append(STRING_PLACEHOLDER) + .append(AbfsHttpConstants.SINGLE_WHITE_SPACE) + .append(LATENCY_COUNT_KEY) + .append(AbfsHttpConstants.EQUAL) + .append(STRING_PLACEHOLDER) + .append(STRING_PLACEHOLDER) + .toString(); + } + } + + public void trackInfo(AbfsPerfInfo perfInfo) { + if (!enabled) { + return; + } + + if (isValidInstant(perfInfo.getAggregateStart()) && perfInfo.getAggregateCount() > 0) { + recordClientLatency( + perfInfo.getTrackingStart(), + perfInfo.getTrackingEnd(), + perfInfo.getCallerName(), + perfInfo.getCalleeName(), + perfInfo.getSuccess(), + perfInfo.getAggregateStart(), + perfInfo.getAggregateCount(), + perfInfo.getResult()); + } else { + recordClientLatency( + perfInfo.getTrackingStart(), + perfInfo.getTrackingEnd(), + perfInfo.getCallerName(), + perfInfo.getCalleeName(), + perfInfo.getSuccess(), + perfInfo.getResult()); + } + } + + public Instant getLatencyInstant() { + if (!enabled) { + return null; + } + + return Instant.now(); + } + + private void recordClientLatency( + Instant operationStart, + Instant operationStop, + String callerName, + String calleeName, + boolean success, + AbfsPerfLoggable res) { + + Instant trackerStart = Instant.now(); + long latency = isValidInstant(operationStart) && isValidInstant(operationStop) + ? Duration.between(operationStart, operationStop).toMillis() : -1; + + String latencyDetails = String.format(singletonLatencyReportingFormat, + Instant.now(), + callerName, + calleeName, + success ? "Succeeded" : "Failed", + latency, + res == null ? "" : (" " + res.getLogString())); + + this.offerToQueue(trackerStart, latencyDetails); + } + + private void recordClientLatency( + Instant operationStart, + Instant operationStop, + String callerName, + String calleeName, + boolean success, + Instant aggregateStart, + long aggregateCount, + AbfsPerfLoggable res){ + + Instant trackerStart = Instant.now(); + long latency = isValidInstant(operationStart) && isValidInstant(operationStop) + ? Duration.between(operationStart, operationStop).toMillis() : -1; + long aggregateLatency = isValidInstant(aggregateStart) && isValidInstant(operationStop) + ? Duration.between(aggregateStart, operationStop).toMillis() : -1; + + String latencyDetails = String.format(aggregateLatencyReportingFormat, + Instant.now(), + callerName, + calleeName, + success ? "Succeeded" : "Failed", + latency, + aggregateLatency, + aggregateCount, + res == null ? "" : (" " + res.getLogString())); + + offerToQueue(trackerStart, latencyDetails); + } + + public String getClientLatency() { + if (!enabled) { + return null; + } + + Instant trackerStart = Instant.now(); + String latencyDetails = queue.poll(); // non-blocking pop + + if (LOG.isDebugEnabled()) { + Instant stop = Instant.now(); + long elapsed = Duration.between(trackerStart, stop).toMillis(); + LOG.debug("Dequeued latency info [{} ms]: {}", elapsed, latencyDetails); + } + + return latencyDetails; + } + + private void offerToQueue(Instant trackerStart, String latencyDetails) { + queue.offer(latencyDetails); // non-blocking append + + if (LOG.isDebugEnabled()) { + Instant trackerStop = Instant.now(); + long elapsed = Duration.between(trackerStart, trackerStop).toMillis(); + LOG.debug("Queued latency info [{} ms]: {}", elapsed, latencyDetails); + } + } + + private boolean isValidInstant(Instant testInstant) { + return testInstant != null && testInstant != Instant.MIN && testInstant != Instant.MAX; + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index fa8f742cfdc..4803b6c2fa7 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -121,6 +121,14 @@ public class AbfsRestOperation { * HTTP operations. */ void execute() throws AzureBlobFileSystemException { + // see if we have latency reports from the previous requests + String latencyHeader = this.client.getAbfsPerfTracker().getClientLatency(); + if (latencyHeader != null && !latencyHeader.isEmpty()) { + AbfsHttpHeader httpHeader = + new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_ABFS_CLIENT_LATENCY, latencyHeader); + requestHeaders.add(httpHeader); + } + int retryCount = 0; while (!executeHttpOperation(retryCount++)) { try { diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md index c5bad77031d..47739a75438 100644 --- a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md +++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md @@ -661,6 +661,52 @@ Hflush() being the only documented API that can provide persistent data transfer, Flush() also attempting to persist buffered data will lead to performance issues. +### Perf Options + +#### 1. HTTP Request Tracking Options +If you set `fs.azure.abfs.latency.track` to `true`, the module starts tracking the +performance metrics of ABFS HTTP traffic. To obtain these numbers on your machine +or cluster, you will also need to enable debug logging for the `AbfsPerfTracker` +class in your `log4j` config. A typical perf log line appears like: + +``` +h=KARMA t=2019-10-25T20:21:14.518Z a=abfstest01.dfs.core.windows.net +c=abfs-testcontainer-84828169-6488-4a62-a875-1e674275a29f cr=delete ce=deletePath +r=Succeeded l=32 ls=32 lc=1 s=200 e= ci=95121dae-70a8-4187-b067-614091034558 +ri=97effdcf-201f-0097-2d71-8bae00000000 ct=0 st=0 rt=0 bs=0 br=0 m=DELETE +u=https%3A%2F%2Fabfstest01.dfs.core.windows.net%2Ftestcontainer%2Ftest%3Ftimeout%3D90%26recursive%3Dtrue +``` + +The fields have the following definitions: + +`h`: host name +`t`: time when this request was logged +`a`: Azure storage account name +`c`: container name +`cr`: name of the caller method +`ce`: name of the callee method +`r`: result (Succeeded/Failed) +`l`: latency (time spent in callee) +`ls`: latency sum (aggregate time spent in caller; logged when there are multiple +callees; logged with the last callee) +`lc`: latency count (number of callees; logged when there are multiple callees; +logged with the last callee) +`s`: HTTP Status code +`e`: Error code +`ci`: client request ID +`ri`: server request ID +`ct`: connection time in milliseconds +`st`: sending time in milliseconds +`rt`: receiving time in milliseconds +`bs`: bytes sent +`br`: bytes received +`m`: HTTP method (GET, PUT etc) +`u`: Encoded HTTP URL + +Note that these performance numbers are also sent back to the ADLS Gen 2 API endpoints +in the `x-ms-abfs-client-latency` HTTP headers in subsequent requests. Azure uses these +settings to track their end-to-end latency. + ## Troubleshooting The problems associated with the connector usually come down to, in order diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java index 42009330703..ea2d1f2883a 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java @@ -43,7 +43,7 @@ public final class TestAbfsClient { AbfsConfiguration config, boolean includeSSLProvider) { AbfsClient client = new AbfsClient(baseUrl, null, - config, null, null); + config, null, null, null); String sslProviderName = null; if (includeSSLProvider) { sslProviderName = SSLSocketFactoryEx.getDefaultFactory().getProviderName(); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java new file mode 100644 index 00000000000..4f4210287ce --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java @@ -0,0 +1,408 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.net.URL; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Test the latency tracker for ABFS. + * + */ +public final class TestAbfsPerfTracker { + private static final Logger LOG = LoggerFactory.getLogger(TestAbfsPerfTracker.class); + private static ExecutorService executorService = null; + private static final int TEST_AGGREGATE_COUNT = 42; + private final String filesystemName = "bogusFilesystemName"; + private final String accountName = "bogusAccountName"; + private final URL url; + + public TestAbfsPerfTracker() throws Exception { + this.url = new URL("http", "www.microsoft.com", "/bogusFile"); + } + + @Before + public void setUp() throws Exception { + executorService = Executors.newCachedThreadPool(); + } + + @After + public void tearDown() throws Exception { + executorService.shutdown(); + } + + @Test + public void verifyDisablingOfTracker() throws Exception { + // verify that disabling of the tracker works + AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, false); + + String latencyDetails = abfsPerfTracker.getClientLatency(); + assertThat(latencyDetails).describedAs("AbfsPerfTracker should be empty").isNull(); + + try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "disablingCaller", + "disablingCallee")) { + AbfsHttpOperation op = new AbfsHttpOperation(url, "GET", new ArrayList<>()); + tracker.registerResult(op).registerSuccess(true); + } + + latencyDetails = abfsPerfTracker.getClientLatency(); + assertThat(latencyDetails).describedAs("AbfsPerfTracker should return no record").isNull(); + } + + @Test + public void verifyTrackingForSingletonLatencyRecords() throws Exception { + // verify that tracking for singleton latency records works as expected + final int numTasks = 100; + AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true); + + String latencyDetails = abfsPerfTracker.getClientLatency(); + assertThat(latencyDetails).describedAs("AbfsPerfTracker should be empty").isNull(); + + List> tasks = new ArrayList<>(); + AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>()); + + for (int i = 0; i < numTasks; i++) { + tasks.add(() -> { + try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "oneOperationCaller", + "oneOperationCallee")) { + tracker.registerResult(httpOperation).registerSuccess(true); + return 0; + } + }); + } + + for (Future fr: executorService.invokeAll(tasks)) { + fr.get(); + } + + for (int i = 0; i < numTasks; i++) { + latencyDetails = abfsPerfTracker.getClientLatency(); + assertThat(latencyDetails).describedAs("AbfsPerfTracker should return non-null record").isNotNull(); + assertThat(latencyDetails).describedAs("Latency record should be in the correct format") + .containsPattern("h=[^ ]* t=[^ ]* a=bogusFilesystemName c=bogusAccountName cr=oneOperationCaller" + + " ce=oneOperationCallee r=Succeeded l=[0-9]+ s=0 e= ci=[^ ]* ri=[^ ]* bs=0 br=0 m=GET" + + " u=http%3A%2F%2Fwww.microsoft.com%2FbogusFile"); + } + + latencyDetails = abfsPerfTracker.getClientLatency(); + assertThat(latencyDetails).describedAs("AbfsPerfTracker should return no record").isNull(); + } + + @Test + public void verifyTrackingForAggregateLatencyRecords() throws Exception { + // verify that tracking of aggregate latency records works as expected + final int numTasks = 100; + AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true); + + String latencyDetails = abfsPerfTracker.getClientLatency(); + assertThat(latencyDetails).describedAs("AbfsPerfTracker should be empty").isNull(); + + List> tasks = new ArrayList<>(); + AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>()); + + for (int i = 0; i < numTasks; i++) { + tasks.add(() -> { + try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "oneOperationCaller", + "oneOperationCallee")) { + tracker.registerResult(httpOperation).registerSuccess(true) + .registerAggregates(Instant.now(), TEST_AGGREGATE_COUNT); + return 0; + } + }); + } + + for (Future fr: executorService.invokeAll(tasks)) { + fr.get(); + } + + for (int i = 0; i < numTasks; i++) { + latencyDetails = abfsPerfTracker.getClientLatency(); + assertThat(latencyDetails).describedAs("AbfsPerfTracker should return non-null record").isNotNull(); + assertThat(latencyDetails).describedAs("Latency record should be in the correct format") + .containsPattern("h=[^ ]* t=[^ ]* a=bogusFilesystemName c=bogusAccountName cr=oneOperationCaller" + + " ce=oneOperationCallee r=Succeeded l=[0-9]+ ls=[0-9]+ lc=" + TEST_AGGREGATE_COUNT + + " s=0 e= ci=[^ ]* ri=[^ ]* bs=0 br=0 m=GET u=http%3A%2F%2Fwww.microsoft.com%2FbogusFile"); + } + + latencyDetails = abfsPerfTracker.getClientLatency(); + assertThat(latencyDetails).describedAs("AbfsPerfTracker should return no record").isNull(); + } + + @Test + public void verifyRecordingSingletonLatencyIsCheapWhenDisabled() throws Exception { + // when latency tracker is disabled, we expect it to take time equivalent to checking a boolean value + final double maxLatencyWhenDisabledMs = 1000; + final double minLatencyWhenDisabledMs = 0; + final long numTasks = 1000; + long aggregateLatency = 0; + AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, false); + List> tasks = new ArrayList<>(); + final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>()); + + for (int i = 0; i < numTasks; i++) { + tasks.add(() -> { + Instant startRecord = Instant.now(); + + try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "oneOperationCaller", + "oneOperationCallee")) { + tracker.registerResult(httpOperation).registerSuccess(true); + } + + long latencyRecord = Duration.between(startRecord, Instant.now()).toMillis(); + LOG.debug("Spent {} ms in recording latency.", latencyRecord); + return latencyRecord; + }); + } + + for (Future fr: executorService.invokeAll(tasks)) { + aggregateLatency += fr.get(); + } + + double averageRecordLatency = aggregateLatency / numTasks; + assertThat(averageRecordLatency).describedAs("Average time for recording singleton latencies should be bounded") + .isBetween(minLatencyWhenDisabledMs, maxLatencyWhenDisabledMs); + } + + @Test + public void verifyRecordingAggregateLatencyIsCheapWhenDisabled() throws Exception { + // when latency tracker is disabled, we expect it to take time equivalent to checking a boolean value + final double maxLatencyWhenDisabledMs = 1000; + final double minLatencyWhenDisabledMs = 0; + final long numTasks = 1000; + long aggregateLatency = 0; + AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, false); + List> tasks = new ArrayList<>(); + final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>()); + + for (int i = 0; i < numTasks; i++) { + tasks.add(() -> { + Instant startRecord = Instant.now(); + + try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "oneOperationCaller", + "oneOperationCallee")) { + tracker.registerResult(httpOperation).registerSuccess(true) + .registerAggregates(startRecord, TEST_AGGREGATE_COUNT); + } + + long latencyRecord = Duration.between(startRecord, Instant.now()).toMillis(); + LOG.debug("Spent {} ms in recording latency.", latencyRecord); + return latencyRecord; + }); + } + + for (Future fr: executorService.invokeAll(tasks)) { + aggregateLatency += fr.get(); + } + + double averageRecordLatency = aggregateLatency / numTasks; + assertThat(averageRecordLatency).describedAs("Average time for recording aggregate latencies should be bounded") + .isBetween(minLatencyWhenDisabledMs, maxLatencyWhenDisabledMs); + } + + @Test + public void verifyGettingLatencyRecordsIsCheapWhenDisabled() throws Exception { + // when latency tracker is disabled, we expect it to take time equivalent to checking a boolean value + final double maxLatencyWhenDisabledMs = 1000; + final double minLatencyWhenDisabledMs = 0; + final long numTasks = 1000; + long aggregateLatency = 0; + AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, false); + List> tasks = new ArrayList<>(); + + for (int i = 0; i < numTasks; i++) { + tasks.add(() -> { + Instant startGet = Instant.now(); + abfsPerfTracker.getClientLatency(); + long latencyGet = Duration.between(startGet, Instant.now()).toMillis(); + LOG.debug("Spent {} ms in retrieving latency record.", latencyGet); + return latencyGet; + }); + } + + for (Future fr: executorService.invokeAll(tasks)) { + aggregateLatency += fr.get(); + } + + double averageRecordLatency = aggregateLatency / numTasks; + assertThat(averageRecordLatency).describedAs("Average time for getting latency records should be bounded") + .isBetween(minLatencyWhenDisabledMs, maxLatencyWhenDisabledMs); + } + + @Test + public void verifyRecordingSingletonLatencyIsCheapWhenEnabled() throws Exception { + final double maxLatencyWhenDisabledMs = 5000; + final double minLatencyWhenDisabledMs = 0; + final long numTasks = 1000; + long aggregateLatency = 0; + AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true); + List> tasks = new ArrayList<>(); + final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>()); + + for (int i = 0; i < numTasks; i++) { + tasks.add(() -> { + Instant startRecord = Instant.now(); + + try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "oneOperationCaller", + "oneOperationCallee")) { + tracker.registerResult(httpOperation).registerSuccess(true); + } + + long latencyRecord = Duration.between(startRecord, Instant.now()).toMillis(); + LOG.debug("Spent {} ms in recording latency.", latencyRecord); + return latencyRecord; + }); + } + + for (Future fr: executorService.invokeAll(tasks)) { + aggregateLatency += fr.get(); + } + + double averageRecordLatency = aggregateLatency / numTasks; + assertThat(averageRecordLatency).describedAs("Average time for recording singleton latencies should be bounded") + .isBetween(minLatencyWhenDisabledMs, maxLatencyWhenDisabledMs); + } + + @Test + public void verifyRecordingAggregateLatencyIsCheapWhenEnabled() throws Exception { + final double maxLatencyWhenDisabledMs = 5000; + final double minLatencyWhenDisabledMs = 0; + final long numTasks = 1000; + long aggregateLatency = 0; + AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true); + List> tasks = new ArrayList<>(); + final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>()); + + for (int i = 0; i < numTasks; i++) { + tasks.add(() -> { + Instant startRecord = Instant.now(); + + try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "oneOperationCaller", + "oneOperationCallee")) { + tracker.registerResult(httpOperation).registerSuccess(true). + registerAggregates(startRecord, TEST_AGGREGATE_COUNT); + } + + long latencyRecord = Duration.between(startRecord, Instant.now()).toMillis(); + LOG.debug("Spent {} ms in recording latency.", latencyRecord); + return latencyRecord; + }); + } + + for (Future fr: executorService.invokeAll(tasks)) { + aggregateLatency += fr.get(); + } + + double averageRecordLatency = aggregateLatency / numTasks; + assertThat(averageRecordLatency).describedAs("Average time for recording aggregate latencies is bounded") + .isBetween(minLatencyWhenDisabledMs, maxLatencyWhenDisabledMs); + } + + @Test + public void verifyGettingLatencyRecordsIsCheapWhenEnabled() throws Exception { + final double maxLatencyWhenDisabledMs = 5000; + final double minLatencyWhenDisabledMs = 0; + final long numTasks = 1000; + long aggregateLatency = 0; + AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true); + List> tasks = new ArrayList<>(); + + for (int i = 0; i < numTasks; i++) { + tasks.add(() -> { + Instant startRecord = Instant.now(); + abfsPerfTracker.getClientLatency(); + long latencyRecord = Duration.between(startRecord, Instant.now()).toMillis(); + LOG.debug("Spent {} ms in recording latency.", latencyRecord); + return latencyRecord; + }); + } + + for (Future fr: executorService.invokeAll(tasks)) { + aggregateLatency += fr.get(); + } + + double averageRecordLatency = aggregateLatency / numTasks; + assertThat(averageRecordLatency).describedAs("Average time for getting latency records should be bounded") + .isBetween(minLatencyWhenDisabledMs, maxLatencyWhenDisabledMs); + } + + @Test + public void verifyNoExceptionOnInvalidInput() throws Exception { + Instant testInstant = Instant.now(); + AbfsPerfTracker abfsPerfTrackerDisabled = new AbfsPerfTracker(accountName, filesystemName, false); + AbfsPerfTracker abfsPerfTrackerEnabled = new AbfsPerfTracker(accountName, filesystemName, true); + final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList()); + + verifyNoException(abfsPerfTrackerDisabled); + verifyNoException(abfsPerfTrackerEnabled); + } + + private void verifyNoException(AbfsPerfTracker abfsPerfTracker) throws Exception { + Instant testInstant = Instant.now(); + final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList()); + + try ( + AbfsPerfInfo tracker01 = new AbfsPerfInfo(abfsPerfTracker, null, null); + AbfsPerfInfo tracker02 = new AbfsPerfInfo(abfsPerfTracker, "test", null); + AbfsPerfInfo tracker03 = new AbfsPerfInfo(abfsPerfTracker, "test", "test"); + AbfsPerfInfo tracker04 = new AbfsPerfInfo(abfsPerfTracker, "test", "test"); + + AbfsPerfInfo tracker05 = new AbfsPerfInfo(abfsPerfTracker, null, null); + AbfsPerfInfo tracker06 = new AbfsPerfInfo(abfsPerfTracker, "test", null); + AbfsPerfInfo tracker07 = new AbfsPerfInfo(abfsPerfTracker, "test", "test"); + AbfsPerfInfo tracker08 = new AbfsPerfInfo(abfsPerfTracker, "test", "test"); + AbfsPerfInfo tracker09 = new AbfsPerfInfo(abfsPerfTracker, "test", "test"); + AbfsPerfInfo tracker10 = new AbfsPerfInfo(abfsPerfTracker, "test", "test"); + + AbfsPerfInfo tracker11 = new AbfsPerfInfo(abfsPerfTracker, "test", "test"); + AbfsPerfInfo tracker12 = new AbfsPerfInfo(abfsPerfTracker, "test", "test"); + AbfsPerfInfo tracker13 = new AbfsPerfInfo(abfsPerfTracker, "test", "test"); + ) { + tracker01.registerResult(null).registerSuccess(false); + tracker02.registerResult(null).registerSuccess(false); + tracker03.registerResult(null).registerSuccess(false); + tracker04.registerResult(httpOperation).registerSuccess(false); + + tracker05.registerResult(null).registerSuccess(false).registerAggregates(null, 0); + tracker06.registerResult(null).registerSuccess(false).registerAggregates(null, 0); + tracker07.registerResult(null).registerSuccess(false).registerAggregates(null, 0); + tracker08.registerResult(httpOperation).registerSuccess(false).registerAggregates(null, 0); + tracker09.registerResult(httpOperation).registerSuccess(false).registerAggregates(Instant.now(), 0); + tracker10.registerResult(httpOperation).registerSuccess(false).registerAggregates(Instant.now(), TEST_AGGREGATE_COUNT); + + tracker11.registerResult(httpOperation).registerSuccess(false).registerAggregates(testInstant, TEST_AGGREGATE_COUNT); + tracker12.registerResult(httpOperation).registerSuccess(false).registerAggregates(Instant.MAX, TEST_AGGREGATE_COUNT); + tracker13.registerResult(httpOperation).registerSuccess(false).registerAggregates(Instant.MIN, TEST_AGGREGATE_COUNT); + } + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/test/resources/azure-test.xml b/hadoop-tools/hadoop-azure/src/test/resources/azure-test.xml index 24d444a88d7..d833cfbe017 100644 --- a/hadoop-tools/hadoop-azure/src/test/resources/azure-test.xml +++ b/hadoop-tools/hadoop-azure/src/test/resources/azure-test.xml @@ -33,6 +33,11 @@ false + + fs.azure.abfs.latency.track + false + +