HADOOP-16612. Track Azure Blob File System client-perceived latency

Contributed by Jeetesh Mangwani.

This add the ability to track the end-to-end performance of ADLS Gen 2 REST APIs by measuring latency in the Hadoop ABFS driver.
The latency information is sent back to the ADLS Gen 2 REST API endpoints in the subsequent requests.
This commit is contained in:
Jeetesh Mangwani 2019-11-19 08:51:49 -08:00 committed by DadanielZ
parent ffeb6d8ece
commit b1e748f45b
No known key found for this signature in database
GPG Key ID: 5C91E270F172ADFB
18 changed files with 1551 additions and 330 deletions

View File

@ -262,7 +262,12 @@
<artifactId>mockito-all</artifactId> <artifactId>mockito-all</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<profiles> <profiles>

View File

@ -178,6 +178,10 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_USE_UPN) DefaultValue = DEFAULT_USE_UPN)
private boolean useUpn; private boolean useUpn;
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ABFS_LATENCY_TRACK,
DefaultValue = DEFAULT_ABFS_LATENCY_TRACK)
private boolean trackLatency;
private Map<String, String> storageAccountKeys; private Map<String, String> storageAccountKeys;
public AbfsConfiguration(final Configuration rawConfig, String accountName) public AbfsConfiguration(final Configuration rawConfig, String accountName)
@ -471,6 +475,15 @@ public class AbfsConfiguration{
return this.useUpn; return this.useUpn;
} }
/**
* Whether {@code AbfsClient} should track and send latency info back to storage servers.
*
* @return a boolean indicating whether latency should be tracked.
*/
public boolean shouldTrackLatency() {
return this.trackLatency;
}
public AccessTokenProvider getTokenProvider() throws TokenAccessProviderException { public AccessTokenProvider getTokenProvider() throws TokenAccessProviderException {
AuthType authType = getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SharedKey); AuthType authType = getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SharedKey);
if (authType == AuthType.OAuth) { if (authType == AuthType.OAuth) {

View File

@ -114,6 +114,8 @@ public final class ConfigurationKeys {
public static final String FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN = "fs.azure.account.oauth2.refresh.token"; public static final String FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN = "fs.azure.account.oauth2.refresh.token";
/** Key for oauth AAD refresh token endpoint: {@value}. */ /** Key for oauth AAD refresh token endpoint: {@value}. */
public static final String FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN_ENDPOINT = "fs.azure.account.oauth2.refresh.token.endpoint"; public static final String FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN_ENDPOINT = "fs.azure.account.oauth2.refresh.token.endpoint";
/** Key for enabling the tracking of ABFS API latency and sending the latency numbers to the ABFS API service */
public static final String FS_AZURE_ABFS_LATENCY_TRACK = "fs.azure.abfs.latency.track";
public static String accountProperty(String property, String account) { public static String accountProperty(String property, String account) {
return property + "." + account; return property + "." + account;

View File

@ -67,6 +67,7 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_ENABLE_HTTPS = true; public static final boolean DEFAULT_ENABLE_HTTPS = true;
public static final boolean DEFAULT_USE_UPN = false; public static final boolean DEFAULT_USE_UPN = false;
public static final boolean DEFAULT_ABFS_LATENCY_TRACK = false;
private FileSystemConfigurations() {} private FileSystemConfigurations() {}
} }

View File

@ -58,6 +58,7 @@ public final class HttpHeaderConfigurations {
public static final String X_MS_PERMISSIONS = "x-ms-permissions"; public static final String X_MS_PERMISSIONS = "x-ms-permissions";
public static final String X_MS_UMASK = "x-ms-umask"; public static final String X_MS_UMASK = "x-ms-umask";
public static final String X_MS_NAMESPACE_ENABLED = "x-ms-namespace-enabled"; public static final String X_MS_NAMESPACE_ENABLED = "x-ms-namespace-enabled";
public static final String X_MS_ABFS_CLIENT_LATENCY = "x-ms-abfs-client-latency";
private HttpHeaderConfigurations() {} private HttpHeaderConfigurations() {}
} }

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.contracts.services;
import org.apache.hadoop.classification.InterfaceStability;
/**
* The AbfsPerfLoggable contract.
*/
@InterfaceStability.Evolving
public interface AbfsPerfLoggable {
/**
* Gets the string to log to the Abfs Logging API.
*
* @return the string that will be logged.
*/
String getLogString();
}

View File

@ -57,6 +57,7 @@ public class AbfsClient {
private final String filesystem; private final String filesystem;
private final AbfsConfiguration abfsConfiguration; private final AbfsConfiguration abfsConfiguration;
private final String userAgent; private final String userAgent;
private final AbfsPerfTracker abfsPerfTracker;
private final AccessTokenProvider tokenProvider; private final AccessTokenProvider tokenProvider;
@ -64,7 +65,8 @@ public class AbfsClient {
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials, public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration, final AbfsConfiguration abfsConfiguration,
final ExponentialRetryPolicy exponentialRetryPolicy, final ExponentialRetryPolicy exponentialRetryPolicy,
final AccessTokenProvider tokenProvider) { final AccessTokenProvider tokenProvider,
final AbfsPerfTracker abfsPerfTracker) {
this.baseUrl = baseUrl; this.baseUrl = baseUrl;
this.sharedKeyCredentials = sharedKeyCredentials; this.sharedKeyCredentials = sharedKeyCredentials;
String baseUrlString = baseUrl.toString(); String baseUrlString = baseUrl.toString();
@ -85,12 +87,17 @@ public class AbfsClient {
this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName); this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName);
this.tokenProvider = tokenProvider; this.tokenProvider = tokenProvider;
this.abfsPerfTracker = abfsPerfTracker;
} }
public String getFileSystem() { public String getFileSystem() {
return filesystem; return filesystem;
} }
protected AbfsPerfTracker getAbfsPerfTracker() {
return abfsPerfTracker;
}
ExponentialRetryPolicy getRetryPolicy() { ExponentialRetryPolicy getRetryPolicy() {
return retryPolicy; return retryPolicy;
} }

View File

@ -21,8 +21,10 @@ package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.URL; import java.net.URL;
import java.net.URLEncoder;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
@ -40,12 +42,13 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable;
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
/** /**
* Represents an HTTP operation. * Represents an HTTP operation.
*/ */
public class AbfsHttpOperation { public class AbfsHttpOperation implements AbfsPerfLoggable {
private static final Logger LOG = LoggerFactory.getLogger(AbfsHttpOperation.class); private static final Logger LOG = LoggerFactory.getLogger(AbfsHttpOperation.class);
private static final int CONNECT_TIMEOUT = 30 * 1000; private static final int CONNECT_TIMEOUT = 30 * 1000;
@ -161,6 +164,47 @@ public class AbfsHttpOperation {
return sb.toString(); return sb.toString();
} }
// Returns a trace message for the ABFS API logging service to consume
public String getLogString() {
String urlStr = null;
try {
urlStr = URLEncoder.encode(url.toString(), "UTF-8");
} catch(UnsupportedEncodingException e) {
urlStr = "https%3A%2F%2Ffailed%2Fto%2Fencode%2Furl";
}
final StringBuilder sb = new StringBuilder();
sb.append("s=")
.append(statusCode)
.append(" e=")
.append(storageErrorCode)
.append(" ci=")
.append(clientRequestId)
.append(" ri=")
.append(requestId);
if (isTraceEnabled) {
sb.append(" ct=")
.append(connectionTimeMs)
.append(" st=")
.append(sendRequestTimeMs)
.append(" rt=")
.append(recvResponseTimeMs);
}
sb.append(" bs=")
.append(bytesSent)
.append(" br=")
.append(bytesReceived)
.append(" m=")
.append(method)
.append(" u=")
.append(urlStr);
return sb.toString();
}
/** /**
* Initializes a new HTTP request and opens the connection. * Initializes a new HTTP request and opens the connection.
* *

View File

@ -226,8 +226,10 @@ public class AbfsInputStream extends FSInputStream {
throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer"); throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
} }
final AbfsRestOperation op; final AbfsRestOperation op;
try { AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) {
op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag); op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag);
perfInfo.registerResult(op.getResult()).registerSuccess(true);
} catch (AzureBlobFileSystemException ex) { } catch (AzureBlobFileSystemException ex) {
if (ex instanceof AbfsRestOperationException) { if (ex instanceof AbfsRestOperationException) {
AbfsRestOperationException ere = (AbfsRestOperationException) ex; AbfsRestOperationException ere = (AbfsRestOperationException) ex;

View File

@ -289,10 +289,16 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
final Future<Void> job = completionService.submit(new Callable<Void>() { final Future<Void> job = completionService.submit(new Callable<Void>() {
@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
client.append(path, offset, bytes, 0, AbfsPerfTracker tracker = client.getAbfsPerfTracker();
bytesLength); try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
byteBufferPool.putBuffer(ByteBuffer.wrap(bytes)); "writeCurrentBufferToService", "append")) {
return null; AbfsRestOperation op = client.append(path, offset, bytes, 0,
bytesLength);
perfInfo.registerResult(op.getResult());
byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
perfInfo.registerSuccess(true);
return null;
}
} }
}); });
@ -334,8 +340,11 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
private synchronized void flushWrittenBytesToServiceInternal(final long offset, private synchronized void flushWrittenBytesToServiceInternal(final long offset,
final boolean retainUncommitedData, final boolean isClose) throws IOException { final boolean retainUncommitedData, final boolean isClose) throws IOException {
try { AbfsPerfTracker tracker = client.getAbfsPerfTracker();
client.flush(path, offset, retainUncommitedData, isClose); try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
"flushWrittenBytesToServiceInternal", "flush")) {
AbfsRestOperation op = client.flush(path, offset, retainUncommitedData, isClose);
perfInfo.registerResult(op.getResult()).registerSuccess(true);
} catch (AzureBlobFileSystemException ex) { } catch (AzureBlobFileSystemException ex) {
if (ex instanceof AbfsRestOperationException) { if (ex instanceof AbfsRestOperationException) {
if (((AbfsRestOperationException) ex).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { if (((AbfsRestOperationException) ex).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {

View File

@ -0,0 +1,133 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.time.Instant;
import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable;
/**
* {@code AbfsPerfInfo} holds information on ADLS Gen 2 API performance observed by {@code AbfsClient}. Every
* Abfs request keeps adding its information (success/failure, latency etc) to its {@code AbfsPerfInfo}'s object
* as and when it becomes available. When the request is over, the performance information is recorded while
* the {@code AutoCloseable} {@code AbfsPerfInfo} object is "closed".
*/
public final class AbfsPerfInfo implements AutoCloseable {
// the tracker which will be extracting perf info out of this object.
private AbfsPerfTracker abfsPerfTracker;
// the caller name.
private String callerName;
// the callee name.
private String calleeName;
// time when this tracking started.
private Instant trackingStart;
// time when this tracking ended.
private Instant trackingEnd;
// whether the tracked request was successful.
private boolean success;
// time when the aggregate operation (to which this request belongs) started.
private Instant aggregateStart;
// number of requests in the aggregate operation (to which this request belongs).
private long aggregateCount;
// result of the request.
private AbfsPerfLoggable res;
public AbfsPerfInfo(AbfsPerfTracker abfsPerfTracker, String callerName, String calleeName) {
this.callerName = callerName;
this.calleeName = calleeName;
this.abfsPerfTracker = abfsPerfTracker;
this.success = false;
this.trackingStart = abfsPerfTracker.getLatencyInstant();
}
public AbfsPerfInfo registerSuccess(boolean success) {
this.success = success;
return this;
}
public AbfsPerfInfo registerResult(AbfsPerfLoggable res) {
this.res = res;
return this;
}
public AbfsPerfInfo registerAggregates(Instant aggregateStart, long aggregateCount) {
this.aggregateStart = aggregateStart;
this.aggregateCount = aggregateCount;
return this;
}
public AbfsPerfInfo finishTracking() {
if (this.trackingEnd == null) {
this.trackingEnd = abfsPerfTracker.getLatencyInstant();
}
return this;
}
public AbfsPerfInfo registerCallee(String calleeName) {
this.calleeName = calleeName;
return this;
}
@Override
public void close() {
abfsPerfTracker.trackInfo(this.finishTracking());
}
public String getCallerName() {
return callerName;
};
public String getCalleeName() {
return calleeName;
}
public Instant getTrackingStart() {
return trackingStart;
}
public Instant getTrackingEnd() {
return trackingEnd;
}
public boolean getSuccess() {
return success;
}
public Instant getAggregateStart() {
return aggregateStart;
}
public long getAggregateCount() {
return aggregateCount;
}
public AbfsPerfLoggable getResult() {
return res;
}
}

View File

@ -0,0 +1,319 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable;
/**
* {@code AbfsPerfTracker} keeps track of service latencies observed by {@code AbfsClient}. Every request hands over
* its perf-related information as a {@code AbfsPerfInfo} object (contains success/failure, latency etc) to the
* {@code AbfsPerfTracker}'s queue. When a request is made, we check {@code AbfsPerfTracker} to see if there are
* any latency numbers to be reported. If there are any, the stats are added to an HTTP header
* ({@code x-ms-abfs-client-latency}) on the next request.
*
* A typical perf log line appears like:
*
* h=KARMA t=2019-10-25T20:21:14.518Z a=abfstest01.dfs.core.windows.net
* c=abfs-testcontainer-84828169-6488-4a62-a875-1e674275a29f cr=delete ce=deletePath r=Succeeded l=32 ls=32 lc=1 s=200
* e= ci=95121dae-70a8-4187-b067-614091034558 ri=97effdcf-201f-0097-2d71-8bae00000000 ct=0 st=0 rt=0 bs=0 br=0 m=DELETE
* u=https%3A%2F%2Fabfstest01.dfs.core.windows.net%2Fabfs-testcontainer%2Ftest%3Ftimeout%3D90%26recursive%3Dtrue
*
* The fields have the following definitions:
*
* h: host name
* t: time when this request was logged
* a: Azure storage account name
* c: container name
* cr: name of the caller method
* ce: name of the callee method
* r: result (Succeeded/Failed)
* l: latency (time spent in callee)
* ls: latency sum (aggregate time spent in caller; logged when there are multiple callees;
* logged with the last callee)
* lc: latency count (number of callees; logged when there are multiple callees;
* logged with the last callee)
* s: HTTP Status code
* e: Error code
* ci: client request ID
* ri: server request ID
* ct: connection time in milliseconds
* st: sending time in milliseconds
* rt: receiving time in milliseconds
* bs: bytes sent
* br: bytes received
* m: HTTP method (GET, PUT etc)
* u: Encoded HTTP URL
*
*/
public final class AbfsPerfTracker {
// the logger.
private static final Logger LOG = LoggerFactory.getLogger(AbfsPerfTracker.class);
// the field names of perf log lines.
private static final String HOST_NAME_KEY = "h";
private static final String TIMESTAMP_KEY = "t";
private static final String STORAGE_ACCOUNT_NAME_KEY = "a";
private static final String CONTAINER_NAME_KEY = "c";
private static final String CALLER_METHOD_NAME_KEY = "cr";
private static final String CALLEE_METHOD_NAME_KEY = "ce";
private static final String RESULT_KEY = "r";
private static final String LATENCY_KEY = "l";
private static final String LATENCY_SUM_KEY = "ls";
private static final String LATENCY_COUNT_KEY = "lc";
private static final String HTTP_STATUS_CODE_KEY = "s";
private static final String ERROR_CODE_KEY = "e";
private static final String CLIENT_REQUEST_ID_KEY = "ci";
private static final String SERVER_REQUEST_ID_KEY = "ri";
private static final String CONNECTION_TIME_KEY = "ct";
private static final String SENDING_TIME_KEY = "st";
private static final String RECEIVING_TIME_KEY = "rt";
private static final String BYTES_SENT_KEY = "bs";
private static final String BYTES_RECEIVED_KEY = "br";
private static final String HTTP_METHOD_KEY = "m";
private static final String HTTP_URL_KEY = "u";
private static final String STRING_PLACEHOLDER = "%s";
// the queue to hold latency information.
private final ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();
// whether the latency tracker has been enabled.
private boolean enabled = false;
// the host name.
private String hostName;
// singleton latency reporting format.
private String singletonLatencyReportingFormat;
// aggregate latency reporting format.
private String aggregateLatencyReportingFormat;
public AbfsPerfTracker(String filesystemName, String accountName, AbfsConfiguration configuration) {
this(filesystemName, accountName, configuration.shouldTrackLatency());
}
protected AbfsPerfTracker(String filesystemName, String accountName, boolean enabled) {
this.enabled = enabled;
LOG.debug("AbfsPerfTracker configuration: {}", enabled);
if (enabled) {
try {
hostName = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
hostName = "UnknownHost";
}
String commonReportingFormat = new StringBuilder()
.append(HOST_NAME_KEY)
.append(AbfsHttpConstants.EQUAL)
.append(hostName)
.append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
.append(TIMESTAMP_KEY)
.append(AbfsHttpConstants.EQUAL)
.append(STRING_PLACEHOLDER)
.append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
.append(STORAGE_ACCOUNT_NAME_KEY)
.append(AbfsHttpConstants.EQUAL)
.append(accountName)
.append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
.append(CONTAINER_NAME_KEY)
.append(AbfsHttpConstants.EQUAL)
.append(filesystemName)
.append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
.append(CALLER_METHOD_NAME_KEY)
.append(AbfsHttpConstants.EQUAL)
.append(STRING_PLACEHOLDER)
.append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
.append(CALLEE_METHOD_NAME_KEY)
.append(AbfsHttpConstants.EQUAL)
.append(STRING_PLACEHOLDER)
.append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
.append(RESULT_KEY)
.append(AbfsHttpConstants.EQUAL)
.append(STRING_PLACEHOLDER)
.append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
.append(LATENCY_KEY)
.append(AbfsHttpConstants.EQUAL)
.append(STRING_PLACEHOLDER)
.toString();
/**
* Example singleton log (no ls or lc field)
* h=KARMA t=2019-10-25T20:21:14.518Z a=abfstest01.dfs.core.windows.net
* c=abfs-testcontainer-84828169-6488-4a62-a875-1e674275a29f cr=delete ce=deletePath r=Succeeded l=32 s=200
* e= ci=95121dae-70a8-4187-b067-614091034558 ri=97effdcf-201f-0097-2d71-8bae00000000 ct=0 st=0 rt=0 bs=0 br=0 m=DELETE
* u=https%3A%2F%2Fabfstest01.dfs.core.windows.net%2Fabfs-testcontainer%2Ftest%3Ftimeout%3D90%26recursive%3Dtrue
*/
singletonLatencyReportingFormat = new StringBuilder()
.append(commonReportingFormat)
.append(STRING_PLACEHOLDER)
.toString();
/**
* Example aggregate log
* h=KARMA t=2019-10-25T20:21:14.518Z a=abfstest01.dfs.core.windows.net
* c=abfs-testcontainer-84828169-6488-4a62-a875-1e674275a29f cr=delete ce=deletePath r=Succeeded l=32 ls=32 lc=1 s=200
* e= ci=95121dae-70a8-4187-b067-614091034558 ri=97effdcf-201f-0097-2d71-8bae00000000 ct=0 st=0 rt=0 bs=0 br=0 m=DELETE
* u=https%3A%2F%2Fabfstest01.dfs.core.windows.net%2Fabfs-testcontainer%2Ftest%3Ftimeout%3D90%26recursive%3Dtrue
*/
aggregateLatencyReportingFormat = new StringBuilder()
.append(commonReportingFormat)
.append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
.append(LATENCY_SUM_KEY)
.append(AbfsHttpConstants.EQUAL)
.append(STRING_PLACEHOLDER)
.append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
.append(LATENCY_COUNT_KEY)
.append(AbfsHttpConstants.EQUAL)
.append(STRING_PLACEHOLDER)
.append(STRING_PLACEHOLDER)
.toString();
}
}
public void trackInfo(AbfsPerfInfo perfInfo) {
if (!enabled) {
return;
}
if (isValidInstant(perfInfo.getAggregateStart()) && perfInfo.getAggregateCount() > 0) {
recordClientLatency(
perfInfo.getTrackingStart(),
perfInfo.getTrackingEnd(),
perfInfo.getCallerName(),
perfInfo.getCalleeName(),
perfInfo.getSuccess(),
perfInfo.getAggregateStart(),
perfInfo.getAggregateCount(),
perfInfo.getResult());
} else {
recordClientLatency(
perfInfo.getTrackingStart(),
perfInfo.getTrackingEnd(),
perfInfo.getCallerName(),
perfInfo.getCalleeName(),
perfInfo.getSuccess(),
perfInfo.getResult());
}
}
public Instant getLatencyInstant() {
if (!enabled) {
return null;
}
return Instant.now();
}
private void recordClientLatency(
Instant operationStart,
Instant operationStop,
String callerName,
String calleeName,
boolean success,
AbfsPerfLoggable res) {
Instant trackerStart = Instant.now();
long latency = isValidInstant(operationStart) && isValidInstant(operationStop)
? Duration.between(operationStart, operationStop).toMillis() : -1;
String latencyDetails = String.format(singletonLatencyReportingFormat,
Instant.now(),
callerName,
calleeName,
success ? "Succeeded" : "Failed",
latency,
res == null ? "" : (" " + res.getLogString()));
this.offerToQueue(trackerStart, latencyDetails);
}
private void recordClientLatency(
Instant operationStart,
Instant operationStop,
String callerName,
String calleeName,
boolean success,
Instant aggregateStart,
long aggregateCount,
AbfsPerfLoggable res){
Instant trackerStart = Instant.now();
long latency = isValidInstant(operationStart) && isValidInstant(operationStop)
? Duration.between(operationStart, operationStop).toMillis() : -1;
long aggregateLatency = isValidInstant(aggregateStart) && isValidInstant(operationStop)
? Duration.between(aggregateStart, operationStop).toMillis() : -1;
String latencyDetails = String.format(aggregateLatencyReportingFormat,
Instant.now(),
callerName,
calleeName,
success ? "Succeeded" : "Failed",
latency,
aggregateLatency,
aggregateCount,
res == null ? "" : (" " + res.getLogString()));
offerToQueue(trackerStart, latencyDetails);
}
public String getClientLatency() {
if (!enabled) {
return null;
}
Instant trackerStart = Instant.now();
String latencyDetails = queue.poll(); // non-blocking pop
if (LOG.isDebugEnabled()) {
Instant stop = Instant.now();
long elapsed = Duration.between(trackerStart, stop).toMillis();
LOG.debug("Dequeued latency info [{} ms]: {}", elapsed, latencyDetails);
}
return latencyDetails;
}
private void offerToQueue(Instant trackerStart, String latencyDetails) {
queue.offer(latencyDetails); // non-blocking append
if (LOG.isDebugEnabled()) {
Instant trackerStop = Instant.now();
long elapsed = Duration.between(trackerStart, trackerStop).toMillis();
LOG.debug("Queued latency info [{} ms]: {}", elapsed, latencyDetails);
}
}
private boolean isValidInstant(Instant testInstant) {
return testInstant != null && testInstant != Instant.MIN && testInstant != Instant.MAX;
}
}

View File

@ -121,6 +121,14 @@ public class AbfsRestOperation {
* HTTP operations. * HTTP operations.
*/ */
void execute() throws AzureBlobFileSystemException { void execute() throws AzureBlobFileSystemException {
// see if we have latency reports from the previous requests
String latencyHeader = this.client.getAbfsPerfTracker().getClientLatency();
if (latencyHeader != null && !latencyHeader.isEmpty()) {
AbfsHttpHeader httpHeader =
new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_ABFS_CLIENT_LATENCY, latencyHeader);
requestHeaders.add(httpHeader);
}
int retryCount = 0; int retryCount = 0;
while (!executeHttpOperation(retryCount++)) { while (!executeHttpOperation(retryCount++)) {
try { try {

View File

@ -661,6 +661,52 @@ Hflush() being the only documented API that can provide persistent data
transfer, Flush() also attempting to persist buffered data will lead to transfer, Flush() also attempting to persist buffered data will lead to
performance issues. performance issues.
### <a name="perfoptions"></a> Perf Options
#### <a name="abfstracklatencyoptions"></a> 1. HTTP Request Tracking Options
If you set `fs.azure.abfs.latency.track` to `true`, the module starts tracking the
performance metrics of ABFS HTTP traffic. To obtain these numbers on your machine
or cluster, you will also need to enable debug logging for the `AbfsPerfTracker`
class in your `log4j` config. A typical perf log line appears like:
```
h=KARMA t=2019-10-25T20:21:14.518Z a=abfstest01.dfs.core.windows.net
c=abfs-testcontainer-84828169-6488-4a62-a875-1e674275a29f cr=delete ce=deletePath
r=Succeeded l=32 ls=32 lc=1 s=200 e= ci=95121dae-70a8-4187-b067-614091034558
ri=97effdcf-201f-0097-2d71-8bae00000000 ct=0 st=0 rt=0 bs=0 br=0 m=DELETE
u=https%3A%2F%2Fabfstest01.dfs.core.windows.net%2Ftestcontainer%2Ftest%3Ftimeout%3D90%26recursive%3Dtrue
```
The fields have the following definitions:
`h`: host name
`t`: time when this request was logged
`a`: Azure storage account name
`c`: container name
`cr`: name of the caller method
`ce`: name of the callee method
`r`: result (Succeeded/Failed)
`l`: latency (time spent in callee)
`ls`: latency sum (aggregate time spent in caller; logged when there are multiple
callees; logged with the last callee)
`lc`: latency count (number of callees; logged when there are multiple callees;
logged with the last callee)
`s`: HTTP Status code
`e`: Error code
`ci`: client request ID
`ri`: server request ID
`ct`: connection time in milliseconds
`st`: sending time in milliseconds
`rt`: receiving time in milliseconds
`bs`: bytes sent
`br`: bytes received
`m`: HTTP method (GET, PUT etc)
`u`: Encoded HTTP URL
Note that these performance numbers are also sent back to the ADLS Gen 2 API endpoints
in the `x-ms-abfs-client-latency` HTTP headers in subsequent requests. Azure uses these
settings to track their end-to-end latency.
## <a name="troubleshooting"></a> Troubleshooting ## <a name="troubleshooting"></a> Troubleshooting
The problems associated with the connector usually come down to, in order The problems associated with the connector usually come down to, in order

View File

@ -43,7 +43,7 @@ public final class TestAbfsClient {
AbfsConfiguration config, AbfsConfiguration config,
boolean includeSSLProvider) { boolean includeSSLProvider) {
AbfsClient client = new AbfsClient(baseUrl, null, AbfsClient client = new AbfsClient(baseUrl, null,
config, null, null); config, null, null, null);
String sslProviderName = null; String sslProviderName = null;
if (includeSSLProvider) { if (includeSSLProvider) {
sslProviderName = SSLSocketFactoryEx.getDefaultFactory().getProviderName(); sslProviderName = SSLSocketFactoryEx.getDefaultFactory().getProviderName();

View File

@ -0,0 +1,408 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.net.URL;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Test the latency tracker for ABFS.
*
*/
public final class TestAbfsPerfTracker {
private static final Logger LOG = LoggerFactory.getLogger(TestAbfsPerfTracker.class);
private static ExecutorService executorService = null;
private static final int TEST_AGGREGATE_COUNT = 42;
private final String filesystemName = "bogusFilesystemName";
private final String accountName = "bogusAccountName";
private final URL url;
public TestAbfsPerfTracker() throws Exception {
this.url = new URL("http", "www.microsoft.com", "/bogusFile");
}
@Before
public void setUp() throws Exception {
executorService = Executors.newCachedThreadPool();
}
@After
public void tearDown() throws Exception {
executorService.shutdown();
}
@Test
public void verifyDisablingOfTracker() throws Exception {
// verify that disabling of the tracker works
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, false);
String latencyDetails = abfsPerfTracker.getClientLatency();
assertThat(latencyDetails).describedAs("AbfsPerfTracker should be empty").isNull();
try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "disablingCaller",
"disablingCallee")) {
AbfsHttpOperation op = new AbfsHttpOperation(url, "GET", new ArrayList<>());
tracker.registerResult(op).registerSuccess(true);
}
latencyDetails = abfsPerfTracker.getClientLatency();
assertThat(latencyDetails).describedAs("AbfsPerfTracker should return no record").isNull();
}
@Test
public void verifyTrackingForSingletonLatencyRecords() throws Exception {
// verify that tracking for singleton latency records works as expected
final int numTasks = 100;
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true);
String latencyDetails = abfsPerfTracker.getClientLatency();
assertThat(latencyDetails).describedAs("AbfsPerfTracker should be empty").isNull();
List<Callable<Integer>> tasks = new ArrayList<>();
AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>());
for (int i = 0; i < numTasks; i++) {
tasks.add(() -> {
try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "oneOperationCaller",
"oneOperationCallee")) {
tracker.registerResult(httpOperation).registerSuccess(true);
return 0;
}
});
}
for (Future<Integer> fr: executorService.invokeAll(tasks)) {
fr.get();
}
for (int i = 0; i < numTasks; i++) {
latencyDetails = abfsPerfTracker.getClientLatency();
assertThat(latencyDetails).describedAs("AbfsPerfTracker should return non-null record").isNotNull();
assertThat(latencyDetails).describedAs("Latency record should be in the correct format")
.containsPattern("h=[^ ]* t=[^ ]* a=bogusFilesystemName c=bogusAccountName cr=oneOperationCaller"
+ " ce=oneOperationCallee r=Succeeded l=[0-9]+ s=0 e= ci=[^ ]* ri=[^ ]* bs=0 br=0 m=GET"
+ " u=http%3A%2F%2Fwww.microsoft.com%2FbogusFile");
}
latencyDetails = abfsPerfTracker.getClientLatency();
assertThat(latencyDetails).describedAs("AbfsPerfTracker should return no record").isNull();
}
@Test
public void verifyTrackingForAggregateLatencyRecords() throws Exception {
// verify that tracking of aggregate latency records works as expected
final int numTasks = 100;
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true);
String latencyDetails = abfsPerfTracker.getClientLatency();
assertThat(latencyDetails).describedAs("AbfsPerfTracker should be empty").isNull();
List<Callable<Integer>> tasks = new ArrayList<>();
AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>());
for (int i = 0; i < numTasks; i++) {
tasks.add(() -> {
try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "oneOperationCaller",
"oneOperationCallee")) {
tracker.registerResult(httpOperation).registerSuccess(true)
.registerAggregates(Instant.now(), TEST_AGGREGATE_COUNT);
return 0;
}
});
}
for (Future<Integer> fr: executorService.invokeAll(tasks)) {
fr.get();
}
for (int i = 0; i < numTasks; i++) {
latencyDetails = abfsPerfTracker.getClientLatency();
assertThat(latencyDetails).describedAs("AbfsPerfTracker should return non-null record").isNotNull();
assertThat(latencyDetails).describedAs("Latency record should be in the correct format")
.containsPattern("h=[^ ]* t=[^ ]* a=bogusFilesystemName c=bogusAccountName cr=oneOperationCaller"
+ " ce=oneOperationCallee r=Succeeded l=[0-9]+ ls=[0-9]+ lc=" + TEST_AGGREGATE_COUNT
+ " s=0 e= ci=[^ ]* ri=[^ ]* bs=0 br=0 m=GET u=http%3A%2F%2Fwww.microsoft.com%2FbogusFile");
}
latencyDetails = abfsPerfTracker.getClientLatency();
assertThat(latencyDetails).describedAs("AbfsPerfTracker should return no record").isNull();
}
@Test
public void verifyRecordingSingletonLatencyIsCheapWhenDisabled() throws Exception {
// when latency tracker is disabled, we expect it to take time equivalent to checking a boolean value
final double maxLatencyWhenDisabledMs = 1000;
final double minLatencyWhenDisabledMs = 0;
final long numTasks = 1000;
long aggregateLatency = 0;
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, false);
List<Callable<Long>> tasks = new ArrayList<>();
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>());
for (int i = 0; i < numTasks; i++) {
tasks.add(() -> {
Instant startRecord = Instant.now();
try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "oneOperationCaller",
"oneOperationCallee")) {
tracker.registerResult(httpOperation).registerSuccess(true);
}
long latencyRecord = Duration.between(startRecord, Instant.now()).toMillis();
LOG.debug("Spent {} ms in recording latency.", latencyRecord);
return latencyRecord;
});
}
for (Future<Long> fr: executorService.invokeAll(tasks)) {
aggregateLatency += fr.get();
}
double averageRecordLatency = aggregateLatency / numTasks;
assertThat(averageRecordLatency).describedAs("Average time for recording singleton latencies should be bounded")
.isBetween(minLatencyWhenDisabledMs, maxLatencyWhenDisabledMs);
}
@Test
public void verifyRecordingAggregateLatencyIsCheapWhenDisabled() throws Exception {
// when latency tracker is disabled, we expect it to take time equivalent to checking a boolean value
final double maxLatencyWhenDisabledMs = 1000;
final double minLatencyWhenDisabledMs = 0;
final long numTasks = 1000;
long aggregateLatency = 0;
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, false);
List<Callable<Long>> tasks = new ArrayList<>();
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>());
for (int i = 0; i < numTasks; i++) {
tasks.add(() -> {
Instant startRecord = Instant.now();
try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "oneOperationCaller",
"oneOperationCallee")) {
tracker.registerResult(httpOperation).registerSuccess(true)
.registerAggregates(startRecord, TEST_AGGREGATE_COUNT);
}
long latencyRecord = Duration.between(startRecord, Instant.now()).toMillis();
LOG.debug("Spent {} ms in recording latency.", latencyRecord);
return latencyRecord;
});
}
for (Future<Long> fr: executorService.invokeAll(tasks)) {
aggregateLatency += fr.get();
}
double averageRecordLatency = aggregateLatency / numTasks;
assertThat(averageRecordLatency).describedAs("Average time for recording aggregate latencies should be bounded")
.isBetween(minLatencyWhenDisabledMs, maxLatencyWhenDisabledMs);
}
@Test
public void verifyGettingLatencyRecordsIsCheapWhenDisabled() throws Exception {
// when latency tracker is disabled, we expect it to take time equivalent to checking a boolean value
final double maxLatencyWhenDisabledMs = 1000;
final double minLatencyWhenDisabledMs = 0;
final long numTasks = 1000;
long aggregateLatency = 0;
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, false);
List<Callable<Long>> tasks = new ArrayList<>();
for (int i = 0; i < numTasks; i++) {
tasks.add(() -> {
Instant startGet = Instant.now();
abfsPerfTracker.getClientLatency();
long latencyGet = Duration.between(startGet, Instant.now()).toMillis();
LOG.debug("Spent {} ms in retrieving latency record.", latencyGet);
return latencyGet;
});
}
for (Future<Long> fr: executorService.invokeAll(tasks)) {
aggregateLatency += fr.get();
}
double averageRecordLatency = aggregateLatency / numTasks;
assertThat(averageRecordLatency).describedAs("Average time for getting latency records should be bounded")
.isBetween(minLatencyWhenDisabledMs, maxLatencyWhenDisabledMs);
}
@Test
public void verifyRecordingSingletonLatencyIsCheapWhenEnabled() throws Exception {
final double maxLatencyWhenDisabledMs = 5000;
final double minLatencyWhenDisabledMs = 0;
final long numTasks = 1000;
long aggregateLatency = 0;
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true);
List<Callable<Long>> tasks = new ArrayList<>();
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>());
for (int i = 0; i < numTasks; i++) {
tasks.add(() -> {
Instant startRecord = Instant.now();
try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "oneOperationCaller",
"oneOperationCallee")) {
tracker.registerResult(httpOperation).registerSuccess(true);
}
long latencyRecord = Duration.between(startRecord, Instant.now()).toMillis();
LOG.debug("Spent {} ms in recording latency.", latencyRecord);
return latencyRecord;
});
}
for (Future<Long> fr: executorService.invokeAll(tasks)) {
aggregateLatency += fr.get();
}
double averageRecordLatency = aggregateLatency / numTasks;
assertThat(averageRecordLatency).describedAs("Average time for recording singleton latencies should be bounded")
.isBetween(minLatencyWhenDisabledMs, maxLatencyWhenDisabledMs);
}
@Test
public void verifyRecordingAggregateLatencyIsCheapWhenEnabled() throws Exception {
final double maxLatencyWhenDisabledMs = 5000;
final double minLatencyWhenDisabledMs = 0;
final long numTasks = 1000;
long aggregateLatency = 0;
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true);
List<Callable<Long>> tasks = new ArrayList<>();
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>());
for (int i = 0; i < numTasks; i++) {
tasks.add(() -> {
Instant startRecord = Instant.now();
try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "oneOperationCaller",
"oneOperationCallee")) {
tracker.registerResult(httpOperation).registerSuccess(true).
registerAggregates(startRecord, TEST_AGGREGATE_COUNT);
}
long latencyRecord = Duration.between(startRecord, Instant.now()).toMillis();
LOG.debug("Spent {} ms in recording latency.", latencyRecord);
return latencyRecord;
});
}
for (Future<Long> fr: executorService.invokeAll(tasks)) {
aggregateLatency += fr.get();
}
double averageRecordLatency = aggregateLatency / numTasks;
assertThat(averageRecordLatency).describedAs("Average time for recording aggregate latencies is bounded")
.isBetween(minLatencyWhenDisabledMs, maxLatencyWhenDisabledMs);
}
@Test
public void verifyGettingLatencyRecordsIsCheapWhenEnabled() throws Exception {
final double maxLatencyWhenDisabledMs = 5000;
final double minLatencyWhenDisabledMs = 0;
final long numTasks = 1000;
long aggregateLatency = 0;
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true);
List<Callable<Long>> tasks = new ArrayList<>();
for (int i = 0; i < numTasks; i++) {
tasks.add(() -> {
Instant startRecord = Instant.now();
abfsPerfTracker.getClientLatency();
long latencyRecord = Duration.between(startRecord, Instant.now()).toMillis();
LOG.debug("Spent {} ms in recording latency.", latencyRecord);
return latencyRecord;
});
}
for (Future<Long> fr: executorService.invokeAll(tasks)) {
aggregateLatency += fr.get();
}
double averageRecordLatency = aggregateLatency / numTasks;
assertThat(averageRecordLatency).describedAs("Average time for getting latency records should be bounded")
.isBetween(minLatencyWhenDisabledMs, maxLatencyWhenDisabledMs);
}
@Test
public void verifyNoExceptionOnInvalidInput() throws Exception {
Instant testInstant = Instant.now();
AbfsPerfTracker abfsPerfTrackerDisabled = new AbfsPerfTracker(accountName, filesystemName, false);
AbfsPerfTracker abfsPerfTrackerEnabled = new AbfsPerfTracker(accountName, filesystemName, true);
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<AbfsHttpHeader>());
verifyNoException(abfsPerfTrackerDisabled);
verifyNoException(abfsPerfTrackerEnabled);
}
private void verifyNoException(AbfsPerfTracker abfsPerfTracker) throws Exception {
Instant testInstant = Instant.now();
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<AbfsHttpHeader>());
try (
AbfsPerfInfo tracker01 = new AbfsPerfInfo(abfsPerfTracker, null, null);
AbfsPerfInfo tracker02 = new AbfsPerfInfo(abfsPerfTracker, "test", null);
AbfsPerfInfo tracker03 = new AbfsPerfInfo(abfsPerfTracker, "test", "test");
AbfsPerfInfo tracker04 = new AbfsPerfInfo(abfsPerfTracker, "test", "test");
AbfsPerfInfo tracker05 = new AbfsPerfInfo(abfsPerfTracker, null, null);
AbfsPerfInfo tracker06 = new AbfsPerfInfo(abfsPerfTracker, "test", null);
AbfsPerfInfo tracker07 = new AbfsPerfInfo(abfsPerfTracker, "test", "test");
AbfsPerfInfo tracker08 = new AbfsPerfInfo(abfsPerfTracker, "test", "test");
AbfsPerfInfo tracker09 = new AbfsPerfInfo(abfsPerfTracker, "test", "test");
AbfsPerfInfo tracker10 = new AbfsPerfInfo(abfsPerfTracker, "test", "test");
AbfsPerfInfo tracker11 = new AbfsPerfInfo(abfsPerfTracker, "test", "test");
AbfsPerfInfo tracker12 = new AbfsPerfInfo(abfsPerfTracker, "test", "test");
AbfsPerfInfo tracker13 = new AbfsPerfInfo(abfsPerfTracker, "test", "test");
) {
tracker01.registerResult(null).registerSuccess(false);
tracker02.registerResult(null).registerSuccess(false);
tracker03.registerResult(null).registerSuccess(false);
tracker04.registerResult(httpOperation).registerSuccess(false);
tracker05.registerResult(null).registerSuccess(false).registerAggregates(null, 0);
tracker06.registerResult(null).registerSuccess(false).registerAggregates(null, 0);
tracker07.registerResult(null).registerSuccess(false).registerAggregates(null, 0);
tracker08.registerResult(httpOperation).registerSuccess(false).registerAggregates(null, 0);
tracker09.registerResult(httpOperation).registerSuccess(false).registerAggregates(Instant.now(), 0);
tracker10.registerResult(httpOperation).registerSuccess(false).registerAggregates(Instant.now(), TEST_AGGREGATE_COUNT);
tracker11.registerResult(httpOperation).registerSuccess(false).registerAggregates(testInstant, TEST_AGGREGATE_COUNT);
tracker12.registerResult(httpOperation).registerSuccess(false).registerAggregates(Instant.MAX, TEST_AGGREGATE_COUNT);
tracker13.registerResult(httpOperation).registerSuccess(false).registerAggregates(Instant.MIN, TEST_AGGREGATE_COUNT);
}
}
}

View File

@ -33,6 +33,11 @@
<value>false</value> <value>false</value>
</property> </property>
<property>
<name>fs.azure.abfs.latency.track</name>
<value>false</value>
</property>
<!--==================== ABFS CONFIGURATION ====================--> <!--==================== ABFS CONFIGURATION ====================-->
<!-- SEE relevant section in "site/markdown/testing_azure.md"--> <!-- SEE relevant section in "site/markdown/testing_azure.md"-->