HADOOP-16612. Track Azure Blob File System client-perceived latency

Contributed by Jeetesh Mangwani.

This add the ability to track the end-to-end performance of ADLS Gen 2 REST APIs by measuring latency in the Hadoop ABFS driver.
The latency information is sent back to the ADLS Gen 2 REST API endpoints in the subsequent requests.
This commit is contained in:
Jeetesh Mangwani 2019-11-19 08:51:49 -08:00 committed by DadanielZ
parent 9fbfe6c8f9
commit b033c681e4
No known key found for this signature in database
GPG Key ID: 5C91E270F172ADFB
18 changed files with 1550 additions and 329 deletions

View File

@ -278,6 +278,11 @@
<artifactId>bcpkix-jdk15on</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>

View File

@ -178,6 +178,10 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_USE_UPN)
private boolean useUpn;
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ABFS_LATENCY_TRACK,
DefaultValue = DEFAULT_ABFS_LATENCY_TRACK)
private boolean trackLatency;
private Map<String, String> storageAccountKeys;
public AbfsConfiguration(final Configuration rawConfig, String accountName)
@ -471,6 +475,15 @@ public class AbfsConfiguration{
return this.useUpn;
}
/**
* Whether {@code AbfsClient} should track and send latency info back to storage servers.
*
* @return a boolean indicating whether latency should be tracked.
*/
public boolean shouldTrackLatency() {
return this.trackLatency;
}
public AccessTokenProvider getTokenProvider() throws TokenAccessProviderException {
AuthType authType = getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SharedKey);
if (authType == AuthType.OAuth) {

View File

@ -114,6 +114,8 @@ public final class ConfigurationKeys {
public static final String FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN = "fs.azure.account.oauth2.refresh.token";
/** Key for oauth AAD refresh token endpoint: {@value}. */
public static final String FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN_ENDPOINT = "fs.azure.account.oauth2.refresh.token.endpoint";
/** Key for enabling the tracking of ABFS API latency and sending the latency numbers to the ABFS API service */
public static final String FS_AZURE_ABFS_LATENCY_TRACK = "fs.azure.abfs.latency.track";
public static String accountProperty(String property, String account) {
return property + "." + account;

View File

@ -67,6 +67,7 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_ENABLE_HTTPS = true;
public static final boolean DEFAULT_USE_UPN = false;
public static final boolean DEFAULT_ABFS_LATENCY_TRACK = false;
private FileSystemConfigurations() {}
}

View File

@ -58,6 +58,7 @@ public final class HttpHeaderConfigurations {
public static final String X_MS_PERMISSIONS = "x-ms-permissions";
public static final String X_MS_UMASK = "x-ms-umask";
public static final String X_MS_NAMESPACE_ENABLED = "x-ms-namespace-enabled";
public static final String X_MS_ABFS_CLIENT_LATENCY = "x-ms-abfs-client-latency";
private HttpHeaderConfigurations() {}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.contracts.services;
import org.apache.hadoop.classification.InterfaceStability;
/**
* The AbfsPerfLoggable contract.
*/
@InterfaceStability.Evolving
public interface AbfsPerfLoggable {
/**
* Gets the string to log to the Abfs Logging API.
*
* @return the string that will be logged.
*/
String getLogString();
}

View File

@ -60,6 +60,7 @@ public class AbfsClient implements Closeable {
private final String filesystem;
private final AbfsConfiguration abfsConfiguration;
private final String userAgent;
private final AbfsPerfTracker abfsPerfTracker;
private final AccessTokenProvider tokenProvider;
@ -67,7 +68,8 @@ public class AbfsClient implements Closeable {
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final ExponentialRetryPolicy exponentialRetryPolicy,
final AccessTokenProvider tokenProvider) {
final AccessTokenProvider tokenProvider,
final AbfsPerfTracker abfsPerfTracker) {
this.baseUrl = baseUrl;
this.sharedKeyCredentials = sharedKeyCredentials;
String baseUrlString = baseUrl.toString();
@ -88,6 +90,7 @@ public class AbfsClient implements Closeable {
this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName);
this.tokenProvider = tokenProvider;
this.abfsPerfTracker = abfsPerfTracker;
}
@Override
@ -101,6 +104,10 @@ public class AbfsClient implements Closeable {
return filesystem;
}
protected AbfsPerfTracker getAbfsPerfTracker() {
return abfsPerfTracker;
}
ExponentialRetryPolicy getRetryPolicy() {
return retryPolicy;
}

View File

@ -21,8 +21,10 @@ package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.util.List;
import java.util.UUID;
@ -40,12 +42,13 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable;
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
/**
* Represents an HTTP operation.
*/
public class AbfsHttpOperation {
public class AbfsHttpOperation implements AbfsPerfLoggable {
private static final Logger LOG = LoggerFactory.getLogger(AbfsHttpOperation.class);
private static final int CONNECT_TIMEOUT = 30 * 1000;
@ -161,6 +164,47 @@ public class AbfsHttpOperation {
return sb.toString();
}
// Returns a trace message for the ABFS API logging service to consume
public String getLogString() {
String urlStr = null;
try {
urlStr = URLEncoder.encode(url.toString(), "UTF-8");
} catch(UnsupportedEncodingException e) {
urlStr = "https%3A%2F%2Ffailed%2Fto%2Fencode%2Furl";
}
final StringBuilder sb = new StringBuilder();
sb.append("s=")
.append(statusCode)
.append(" e=")
.append(storageErrorCode)
.append(" ci=")
.append(clientRequestId)
.append(" ri=")
.append(requestId);
if (isTraceEnabled) {
sb.append(" ct=")
.append(connectionTimeMs)
.append(" st=")
.append(sendRequestTimeMs)
.append(" rt=")
.append(recvResponseTimeMs);
}
sb.append(" bs=")
.append(bytesSent)
.append(" br=")
.append(bytesReceived)
.append(" m=")
.append(method)
.append(" u=")
.append(urlStr);
return sb.toString();
}
/**
* Initializes a new HTTP request and opens the connection.
*

View File

@ -226,8 +226,10 @@ public class AbfsInputStream extends FSInputStream {
throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
}
final AbfsRestOperation op;
try {
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) {
op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag);
perfInfo.registerResult(op.getResult()).registerSuccess(true);
} catch (AzureBlobFileSystemException ex) {
if (ex instanceof AbfsRestOperationException) {
AbfsRestOperationException ere = (AbfsRestOperationException) ex;

View File

@ -289,10 +289,16 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
final Future<Void> job = completionService.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
client.append(path, offset, bytes, 0,
bytesLength);
byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
return null;
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
"writeCurrentBufferToService", "append")) {
AbfsRestOperation op = client.append(path, offset, bytes, 0,
bytesLength);
perfInfo.registerResult(op.getResult());
byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
perfInfo.registerSuccess(true);
return null;
}
}
});
@ -334,8 +340,11 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
private synchronized void flushWrittenBytesToServiceInternal(final long offset,
final boolean retainUncommitedData, final boolean isClose) throws IOException {
try {
client.flush(path, offset, retainUncommitedData, isClose);
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
"flushWrittenBytesToServiceInternal", "flush")) {
AbfsRestOperation op = client.flush(path, offset, retainUncommitedData, isClose);
perfInfo.registerResult(op.getResult()).registerSuccess(true);
} catch (AzureBlobFileSystemException ex) {
if (ex instanceof AbfsRestOperationException) {
if (((AbfsRestOperationException) ex).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {

View File

@ -0,0 +1,133 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.time.Instant;
import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable;
/**
* {@code AbfsPerfInfo} holds information on ADLS Gen 2 API performance observed by {@code AbfsClient}. Every
* Abfs request keeps adding its information (success/failure, latency etc) to its {@code AbfsPerfInfo}'s object
* as and when it becomes available. When the request is over, the performance information is recorded while
* the {@code AutoCloseable} {@code AbfsPerfInfo} object is "closed".
*/
public final class AbfsPerfInfo implements AutoCloseable {
// the tracker which will be extracting perf info out of this object.
private AbfsPerfTracker abfsPerfTracker;
// the caller name.
private String callerName;
// the callee name.
private String calleeName;
// time when this tracking started.
private Instant trackingStart;
// time when this tracking ended.
private Instant trackingEnd;
// whether the tracked request was successful.
private boolean success;
// time when the aggregate operation (to which this request belongs) started.
private Instant aggregateStart;
// number of requests in the aggregate operation (to which this request belongs).
private long aggregateCount;
// result of the request.
private AbfsPerfLoggable res;
public AbfsPerfInfo(AbfsPerfTracker abfsPerfTracker, String callerName, String calleeName) {
this.callerName = callerName;
this.calleeName = calleeName;
this.abfsPerfTracker = abfsPerfTracker;
this.success = false;
this.trackingStart = abfsPerfTracker.getLatencyInstant();
}
public AbfsPerfInfo registerSuccess(boolean success) {
this.success = success;
return this;
}
public AbfsPerfInfo registerResult(AbfsPerfLoggable res) {
this.res = res;
return this;
}
public AbfsPerfInfo registerAggregates(Instant aggregateStart, long aggregateCount) {
this.aggregateStart = aggregateStart;
this.aggregateCount = aggregateCount;
return this;
}
public AbfsPerfInfo finishTracking() {
if (this.trackingEnd == null) {
this.trackingEnd = abfsPerfTracker.getLatencyInstant();
}
return this;
}
public AbfsPerfInfo registerCallee(String calleeName) {
this.calleeName = calleeName;
return this;
}
@Override
public void close() {
abfsPerfTracker.trackInfo(this.finishTracking());
}
public String getCallerName() {
return callerName;
};
public String getCalleeName() {
return calleeName;
}
public Instant getTrackingStart() {
return trackingStart;
}
public Instant getTrackingEnd() {
return trackingEnd;
}
public boolean getSuccess() {
return success;
}
public Instant getAggregateStart() {
return aggregateStart;
}
public long getAggregateCount() {
return aggregateCount;
}
public AbfsPerfLoggable getResult() {
return res;
}
}

View File

@ -0,0 +1,319 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable;
/**
* {@code AbfsPerfTracker} keeps track of service latencies observed by {@code AbfsClient}. Every request hands over
* its perf-related information as a {@code AbfsPerfInfo} object (contains success/failure, latency etc) to the
* {@code AbfsPerfTracker}'s queue. When a request is made, we check {@code AbfsPerfTracker} to see if there are
* any latency numbers to be reported. If there are any, the stats are added to an HTTP header
* ({@code x-ms-abfs-client-latency}) on the next request.
*
* A typical perf log line appears like:
*
* h=KARMA t=2019-10-25T20:21:14.518Z a=abfstest01.dfs.core.windows.net
* c=abfs-testcontainer-84828169-6488-4a62-a875-1e674275a29f cr=delete ce=deletePath r=Succeeded l=32 ls=32 lc=1 s=200
* e= ci=95121dae-70a8-4187-b067-614091034558 ri=97effdcf-201f-0097-2d71-8bae00000000 ct=0 st=0 rt=0 bs=0 br=0 m=DELETE
* u=https%3A%2F%2Fabfstest01.dfs.core.windows.net%2Fabfs-testcontainer%2Ftest%3Ftimeout%3D90%26recursive%3Dtrue
*
* The fields have the following definitions:
*
* h: host name
* t: time when this request was logged
* a: Azure storage account name
* c: container name
* cr: name of the caller method
* ce: name of the callee method
* r: result (Succeeded/Failed)
* l: latency (time spent in callee)
* ls: latency sum (aggregate time spent in caller; logged when there are multiple callees;
* logged with the last callee)
* lc: latency count (number of callees; logged when there are multiple callees;
* logged with the last callee)
* s: HTTP Status code
* e: Error code
* ci: client request ID
* ri: server request ID
* ct: connection time in milliseconds
* st: sending time in milliseconds
* rt: receiving time in milliseconds
* bs: bytes sent
* br: bytes received
* m: HTTP method (GET, PUT etc)
* u: Encoded HTTP URL
*
*/
public final class AbfsPerfTracker {
// the logger.
private static final Logger LOG = LoggerFactory.getLogger(AbfsPerfTracker.class);
// the field names of perf log lines.
private static final String HOST_NAME_KEY = "h";
private static final String TIMESTAMP_KEY = "t";
private static final String STORAGE_ACCOUNT_NAME_KEY = "a";
private static final String CONTAINER_NAME_KEY = "c";
private static final String CALLER_METHOD_NAME_KEY = "cr";
private static final String CALLEE_METHOD_NAME_KEY = "ce";
private static final String RESULT_KEY = "r";
private static final String LATENCY_KEY = "l";
private static final String LATENCY_SUM_KEY = "ls";
private static final String LATENCY_COUNT_KEY = "lc";
private static final String HTTP_STATUS_CODE_KEY = "s";
private static final String ERROR_CODE_KEY = "e";
private static final String CLIENT_REQUEST_ID_KEY = "ci";
private static final String SERVER_REQUEST_ID_KEY = "ri";
private static final String CONNECTION_TIME_KEY = "ct";
private static final String SENDING_TIME_KEY = "st";
private static final String RECEIVING_TIME_KEY = "rt";
private static final String BYTES_SENT_KEY = "bs";
private static final String BYTES_RECEIVED_KEY = "br";
private static final String HTTP_METHOD_KEY = "m";
private static final String HTTP_URL_KEY = "u";
private static final String STRING_PLACEHOLDER = "%s";
// the queue to hold latency information.
private final ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();
// whether the latency tracker has been enabled.
private boolean enabled = false;
// the host name.
private String hostName;
// singleton latency reporting format.
private String singletonLatencyReportingFormat;
// aggregate latency reporting format.
private String aggregateLatencyReportingFormat;
public AbfsPerfTracker(String filesystemName, String accountName, AbfsConfiguration configuration) {
this(filesystemName, accountName, configuration.shouldTrackLatency());
}
protected AbfsPerfTracker(String filesystemName, String accountName, boolean enabled) {
this.enabled = enabled;
LOG.debug("AbfsPerfTracker configuration: {}", enabled);
if (enabled) {
try {
hostName = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
hostName = "UnknownHost";
}
String commonReportingFormat = new StringBuilder()
.append(HOST_NAME_KEY)
.append(AbfsHttpConstants.EQUAL)
.append(hostName)
.append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
.append(TIMESTAMP_KEY)
.append(AbfsHttpConstants.EQUAL)
.append(STRING_PLACEHOLDER)
.append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
.append(STORAGE_ACCOUNT_NAME_KEY)
.append(AbfsHttpConstants.EQUAL)
.append(accountName)
.append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
.append(CONTAINER_NAME_KEY)
.append(AbfsHttpConstants.EQUAL)
.append(filesystemName)
.append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
.append(CALLER_METHOD_NAME_KEY)
.append(AbfsHttpConstants.EQUAL)
.append(STRING_PLACEHOLDER)
.append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
.append(CALLEE_METHOD_NAME_KEY)
.append(AbfsHttpConstants.EQUAL)
.append(STRING_PLACEHOLDER)
.append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
.append(RESULT_KEY)
.append(AbfsHttpConstants.EQUAL)
.append(STRING_PLACEHOLDER)
.append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
.append(LATENCY_KEY)
.append(AbfsHttpConstants.EQUAL)
.append(STRING_PLACEHOLDER)
.toString();
/**
* Example singleton log (no ls or lc field)
* h=KARMA t=2019-10-25T20:21:14.518Z a=abfstest01.dfs.core.windows.net
* c=abfs-testcontainer-84828169-6488-4a62-a875-1e674275a29f cr=delete ce=deletePath r=Succeeded l=32 s=200
* e= ci=95121dae-70a8-4187-b067-614091034558 ri=97effdcf-201f-0097-2d71-8bae00000000 ct=0 st=0 rt=0 bs=0 br=0 m=DELETE
* u=https%3A%2F%2Fabfstest01.dfs.core.windows.net%2Fabfs-testcontainer%2Ftest%3Ftimeout%3D90%26recursive%3Dtrue
*/
singletonLatencyReportingFormat = new StringBuilder()
.append(commonReportingFormat)
.append(STRING_PLACEHOLDER)
.toString();
/**
* Example aggregate log
* h=KARMA t=2019-10-25T20:21:14.518Z a=abfstest01.dfs.core.windows.net
* c=abfs-testcontainer-84828169-6488-4a62-a875-1e674275a29f cr=delete ce=deletePath r=Succeeded l=32 ls=32 lc=1 s=200
* e= ci=95121dae-70a8-4187-b067-614091034558 ri=97effdcf-201f-0097-2d71-8bae00000000 ct=0 st=0 rt=0 bs=0 br=0 m=DELETE
* u=https%3A%2F%2Fabfstest01.dfs.core.windows.net%2Fabfs-testcontainer%2Ftest%3Ftimeout%3D90%26recursive%3Dtrue
*/
aggregateLatencyReportingFormat = new StringBuilder()
.append(commonReportingFormat)
.append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
.append(LATENCY_SUM_KEY)
.append(AbfsHttpConstants.EQUAL)
.append(STRING_PLACEHOLDER)
.append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
.append(LATENCY_COUNT_KEY)
.append(AbfsHttpConstants.EQUAL)
.append(STRING_PLACEHOLDER)
.append(STRING_PLACEHOLDER)
.toString();
}
}
public void trackInfo(AbfsPerfInfo perfInfo) {
if (!enabled) {
return;
}
if (isValidInstant(perfInfo.getAggregateStart()) && perfInfo.getAggregateCount() > 0) {
recordClientLatency(
perfInfo.getTrackingStart(),
perfInfo.getTrackingEnd(),
perfInfo.getCallerName(),
perfInfo.getCalleeName(),
perfInfo.getSuccess(),
perfInfo.getAggregateStart(),
perfInfo.getAggregateCount(),
perfInfo.getResult());
} else {
recordClientLatency(
perfInfo.getTrackingStart(),
perfInfo.getTrackingEnd(),
perfInfo.getCallerName(),
perfInfo.getCalleeName(),
perfInfo.getSuccess(),
perfInfo.getResult());
}
}
public Instant getLatencyInstant() {
if (!enabled) {
return null;
}
return Instant.now();
}
private void recordClientLatency(
Instant operationStart,
Instant operationStop,
String callerName,
String calleeName,
boolean success,
AbfsPerfLoggable res) {
Instant trackerStart = Instant.now();
long latency = isValidInstant(operationStart) && isValidInstant(operationStop)
? Duration.between(operationStart, operationStop).toMillis() : -1;
String latencyDetails = String.format(singletonLatencyReportingFormat,
Instant.now(),
callerName,
calleeName,
success ? "Succeeded" : "Failed",
latency,
res == null ? "" : (" " + res.getLogString()));
this.offerToQueue(trackerStart, latencyDetails);
}
private void recordClientLatency(
Instant operationStart,
Instant operationStop,
String callerName,
String calleeName,
boolean success,
Instant aggregateStart,
long aggregateCount,
AbfsPerfLoggable res){
Instant trackerStart = Instant.now();
long latency = isValidInstant(operationStart) && isValidInstant(operationStop)
? Duration.between(operationStart, operationStop).toMillis() : -1;
long aggregateLatency = isValidInstant(aggregateStart) && isValidInstant(operationStop)
? Duration.between(aggregateStart, operationStop).toMillis() : -1;
String latencyDetails = String.format(aggregateLatencyReportingFormat,
Instant.now(),
callerName,
calleeName,
success ? "Succeeded" : "Failed",
latency,
aggregateLatency,
aggregateCount,
res == null ? "" : (" " + res.getLogString()));
offerToQueue(trackerStart, latencyDetails);
}
public String getClientLatency() {
if (!enabled) {
return null;
}
Instant trackerStart = Instant.now();
String latencyDetails = queue.poll(); // non-blocking pop
if (LOG.isDebugEnabled()) {
Instant stop = Instant.now();
long elapsed = Duration.between(trackerStart, stop).toMillis();
LOG.debug("Dequeued latency info [{} ms]: {}", elapsed, latencyDetails);
}
return latencyDetails;
}
private void offerToQueue(Instant trackerStart, String latencyDetails) {
queue.offer(latencyDetails); // non-blocking append
if (LOG.isDebugEnabled()) {
Instant trackerStop = Instant.now();
long elapsed = Duration.between(trackerStart, trackerStop).toMillis();
LOG.debug("Queued latency info [{} ms]: {}", elapsed, latencyDetails);
}
}
private boolean isValidInstant(Instant testInstant) {
return testInstant != null && testInstant != Instant.MIN && testInstant != Instant.MAX;
}
}

View File

@ -121,6 +121,14 @@ public class AbfsRestOperation {
* HTTP operations.
*/
void execute() throws AzureBlobFileSystemException {
// see if we have latency reports from the previous requests
String latencyHeader = this.client.getAbfsPerfTracker().getClientLatency();
if (latencyHeader != null && !latencyHeader.isEmpty()) {
AbfsHttpHeader httpHeader =
new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_ABFS_CLIENT_LATENCY, latencyHeader);
requestHeaders.add(httpHeader);
}
int retryCount = 0;
while (!executeHttpOperation(retryCount++)) {
try {

View File

@ -661,6 +661,52 @@ Hflush() being the only documented API that can provide persistent data
transfer, Flush() also attempting to persist buffered data will lead to
performance issues.
### <a name="perfoptions"></a> Perf Options
#### <a name="abfstracklatencyoptions"></a> 1. HTTP Request Tracking Options
If you set `fs.azure.abfs.latency.track` to `true`, the module starts tracking the
performance metrics of ABFS HTTP traffic. To obtain these numbers on your machine
or cluster, you will also need to enable debug logging for the `AbfsPerfTracker`
class in your `log4j` config. A typical perf log line appears like:
```
h=KARMA t=2019-10-25T20:21:14.518Z a=abfstest01.dfs.core.windows.net
c=abfs-testcontainer-84828169-6488-4a62-a875-1e674275a29f cr=delete ce=deletePath
r=Succeeded l=32 ls=32 lc=1 s=200 e= ci=95121dae-70a8-4187-b067-614091034558
ri=97effdcf-201f-0097-2d71-8bae00000000 ct=0 st=0 rt=0 bs=0 br=0 m=DELETE
u=https%3A%2F%2Fabfstest01.dfs.core.windows.net%2Ftestcontainer%2Ftest%3Ftimeout%3D90%26recursive%3Dtrue
```
The fields have the following definitions:
`h`: host name
`t`: time when this request was logged
`a`: Azure storage account name
`c`: container name
`cr`: name of the caller method
`ce`: name of the callee method
`r`: result (Succeeded/Failed)
`l`: latency (time spent in callee)
`ls`: latency sum (aggregate time spent in caller; logged when there are multiple
callees; logged with the last callee)
`lc`: latency count (number of callees; logged when there are multiple callees;
logged with the last callee)
`s`: HTTP Status code
`e`: Error code
`ci`: client request ID
`ri`: server request ID
`ct`: connection time in milliseconds
`st`: sending time in milliseconds
`rt`: receiving time in milliseconds
`bs`: bytes sent
`br`: bytes received
`m`: HTTP method (GET, PUT etc)
`u`: Encoded HTTP URL
Note that these performance numbers are also sent back to the ADLS Gen 2 API endpoints
in the `x-ms-abfs-client-latency` HTTP headers in subsequent requests. Azure uses these
settings to track their end-to-end latency.
## <a name="troubleshooting"></a> Troubleshooting
The problems associated with the connector usually come down to, in order

View File

@ -43,7 +43,7 @@ public final class TestAbfsClient {
AbfsConfiguration config,
boolean includeSSLProvider) {
AbfsClient client = new AbfsClient(baseUrl, null,
config, null, null);
config, null, null, null);
String sslProviderName = null;
if (includeSSLProvider) {
sslProviderName = DelegatingSSLSocketFactory.getDefaultFactory().getProviderName();

View File

@ -0,0 +1,408 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.net.URL;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Test the latency tracker for ABFS.
*
*/
public final class TestAbfsPerfTracker {
private static final Logger LOG = LoggerFactory.getLogger(TestAbfsPerfTracker.class);
private static ExecutorService executorService = null;
private static final int TEST_AGGREGATE_COUNT = 42;
private final String filesystemName = "bogusFilesystemName";
private final String accountName = "bogusAccountName";
private final URL url;
public TestAbfsPerfTracker() throws Exception {
this.url = new URL("http", "www.microsoft.com", "/bogusFile");
}
@Before
public void setUp() throws Exception {
executorService = Executors.newCachedThreadPool();
}
@After
public void tearDown() throws Exception {
executorService.shutdown();
}
@Test
public void verifyDisablingOfTracker() throws Exception {
// verify that disabling of the tracker works
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, false);
String latencyDetails = abfsPerfTracker.getClientLatency();
assertThat(latencyDetails).describedAs("AbfsPerfTracker should be empty").isNull();
try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "disablingCaller",
"disablingCallee")) {
AbfsHttpOperation op = new AbfsHttpOperation(url, "GET", new ArrayList<>());
tracker.registerResult(op).registerSuccess(true);
}
latencyDetails = abfsPerfTracker.getClientLatency();
assertThat(latencyDetails).describedAs("AbfsPerfTracker should return no record").isNull();
}
@Test
public void verifyTrackingForSingletonLatencyRecords() throws Exception {
// verify that tracking for singleton latency records works as expected
final int numTasks = 100;
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true);
String latencyDetails = abfsPerfTracker.getClientLatency();
assertThat(latencyDetails).describedAs("AbfsPerfTracker should be empty").isNull();
List<Callable<Integer>> tasks = new ArrayList<>();
AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>());
for (int i = 0; i < numTasks; i++) {
tasks.add(() -> {
try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "oneOperationCaller",
"oneOperationCallee")) {
tracker.registerResult(httpOperation).registerSuccess(true);
return 0;
}
});
}
for (Future<Integer> fr: executorService.invokeAll(tasks)) {
fr.get();
}
for (int i = 0; i < numTasks; i++) {
latencyDetails = abfsPerfTracker.getClientLatency();
assertThat(latencyDetails).describedAs("AbfsPerfTracker should return non-null record").isNotNull();
assertThat(latencyDetails).describedAs("Latency record should be in the correct format")
.containsPattern("h=[^ ]* t=[^ ]* a=bogusFilesystemName c=bogusAccountName cr=oneOperationCaller"
+ " ce=oneOperationCallee r=Succeeded l=[0-9]+ s=0 e= ci=[^ ]* ri=[^ ]* bs=0 br=0 m=GET"
+ " u=http%3A%2F%2Fwww.microsoft.com%2FbogusFile");
}
latencyDetails = abfsPerfTracker.getClientLatency();
assertThat(latencyDetails).describedAs("AbfsPerfTracker should return no record").isNull();
}
@Test
public void verifyTrackingForAggregateLatencyRecords() throws Exception {
// verify that tracking of aggregate latency records works as expected
final int numTasks = 100;
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true);
String latencyDetails = abfsPerfTracker.getClientLatency();
assertThat(latencyDetails).describedAs("AbfsPerfTracker should be empty").isNull();
List<Callable<Integer>> tasks = new ArrayList<>();
AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>());
for (int i = 0; i < numTasks; i++) {
tasks.add(() -> {
try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "oneOperationCaller",
"oneOperationCallee")) {
tracker.registerResult(httpOperation).registerSuccess(true)
.registerAggregates(Instant.now(), TEST_AGGREGATE_COUNT);
return 0;
}
});
}
for (Future<Integer> fr: executorService.invokeAll(tasks)) {
fr.get();
}
for (int i = 0; i < numTasks; i++) {
latencyDetails = abfsPerfTracker.getClientLatency();
assertThat(latencyDetails).describedAs("AbfsPerfTracker should return non-null record").isNotNull();
assertThat(latencyDetails).describedAs("Latency record should be in the correct format")
.containsPattern("h=[^ ]* t=[^ ]* a=bogusFilesystemName c=bogusAccountName cr=oneOperationCaller"
+ " ce=oneOperationCallee r=Succeeded l=[0-9]+ ls=[0-9]+ lc=" + TEST_AGGREGATE_COUNT
+ " s=0 e= ci=[^ ]* ri=[^ ]* bs=0 br=0 m=GET u=http%3A%2F%2Fwww.microsoft.com%2FbogusFile");
}
latencyDetails = abfsPerfTracker.getClientLatency();
assertThat(latencyDetails).describedAs("AbfsPerfTracker should return no record").isNull();
}
@Test
public void verifyRecordingSingletonLatencyIsCheapWhenDisabled() throws Exception {
// when latency tracker is disabled, we expect it to take time equivalent to checking a boolean value
final double maxLatencyWhenDisabledMs = 1000;
final double minLatencyWhenDisabledMs = 0;
final long numTasks = 1000;
long aggregateLatency = 0;
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, false);
List<Callable<Long>> tasks = new ArrayList<>();
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>());
for (int i = 0; i < numTasks; i++) {
tasks.add(() -> {
Instant startRecord = Instant.now();
try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "oneOperationCaller",
"oneOperationCallee")) {
tracker.registerResult(httpOperation).registerSuccess(true);
}
long latencyRecord = Duration.between(startRecord, Instant.now()).toMillis();
LOG.debug("Spent {} ms in recording latency.", latencyRecord);
return latencyRecord;
});
}
for (Future<Long> fr: executorService.invokeAll(tasks)) {
aggregateLatency += fr.get();
}
double averageRecordLatency = aggregateLatency / numTasks;
assertThat(averageRecordLatency).describedAs("Average time for recording singleton latencies should be bounded")
.isBetween(minLatencyWhenDisabledMs, maxLatencyWhenDisabledMs);
}
@Test
public void verifyRecordingAggregateLatencyIsCheapWhenDisabled() throws Exception {
// when latency tracker is disabled, we expect it to take time equivalent to checking a boolean value
final double maxLatencyWhenDisabledMs = 1000;
final double minLatencyWhenDisabledMs = 0;
final long numTasks = 1000;
long aggregateLatency = 0;
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, false);
List<Callable<Long>> tasks = new ArrayList<>();
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>());
for (int i = 0; i < numTasks; i++) {
tasks.add(() -> {
Instant startRecord = Instant.now();
try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "oneOperationCaller",
"oneOperationCallee")) {
tracker.registerResult(httpOperation).registerSuccess(true)
.registerAggregates(startRecord, TEST_AGGREGATE_COUNT);
}
long latencyRecord = Duration.between(startRecord, Instant.now()).toMillis();
LOG.debug("Spent {} ms in recording latency.", latencyRecord);
return latencyRecord;
});
}
for (Future<Long> fr: executorService.invokeAll(tasks)) {
aggregateLatency += fr.get();
}
double averageRecordLatency = aggregateLatency / numTasks;
assertThat(averageRecordLatency).describedAs("Average time for recording aggregate latencies should be bounded")
.isBetween(minLatencyWhenDisabledMs, maxLatencyWhenDisabledMs);
}
@Test
public void verifyGettingLatencyRecordsIsCheapWhenDisabled() throws Exception {
// when latency tracker is disabled, we expect it to take time equivalent to checking a boolean value
final double maxLatencyWhenDisabledMs = 1000;
final double minLatencyWhenDisabledMs = 0;
final long numTasks = 1000;
long aggregateLatency = 0;
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, false);
List<Callable<Long>> tasks = new ArrayList<>();
for (int i = 0; i < numTasks; i++) {
tasks.add(() -> {
Instant startGet = Instant.now();
abfsPerfTracker.getClientLatency();
long latencyGet = Duration.between(startGet, Instant.now()).toMillis();
LOG.debug("Spent {} ms in retrieving latency record.", latencyGet);
return latencyGet;
});
}
for (Future<Long> fr: executorService.invokeAll(tasks)) {
aggregateLatency += fr.get();
}
double averageRecordLatency = aggregateLatency / numTasks;
assertThat(averageRecordLatency).describedAs("Average time for getting latency records should be bounded")
.isBetween(minLatencyWhenDisabledMs, maxLatencyWhenDisabledMs);
}
@Test
public void verifyRecordingSingletonLatencyIsCheapWhenEnabled() throws Exception {
final double maxLatencyWhenDisabledMs = 5000;
final double minLatencyWhenDisabledMs = 0;
final long numTasks = 1000;
long aggregateLatency = 0;
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true);
List<Callable<Long>> tasks = new ArrayList<>();
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>());
for (int i = 0; i < numTasks; i++) {
tasks.add(() -> {
Instant startRecord = Instant.now();
try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "oneOperationCaller",
"oneOperationCallee")) {
tracker.registerResult(httpOperation).registerSuccess(true);
}
long latencyRecord = Duration.between(startRecord, Instant.now()).toMillis();
LOG.debug("Spent {} ms in recording latency.", latencyRecord);
return latencyRecord;
});
}
for (Future<Long> fr: executorService.invokeAll(tasks)) {
aggregateLatency += fr.get();
}
double averageRecordLatency = aggregateLatency / numTasks;
assertThat(averageRecordLatency).describedAs("Average time for recording singleton latencies should be bounded")
.isBetween(minLatencyWhenDisabledMs, maxLatencyWhenDisabledMs);
}
@Test
public void verifyRecordingAggregateLatencyIsCheapWhenEnabled() throws Exception {
final double maxLatencyWhenDisabledMs = 5000;
final double minLatencyWhenDisabledMs = 0;
final long numTasks = 1000;
long aggregateLatency = 0;
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true);
List<Callable<Long>> tasks = new ArrayList<>();
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>());
for (int i = 0; i < numTasks; i++) {
tasks.add(() -> {
Instant startRecord = Instant.now();
try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "oneOperationCaller",
"oneOperationCallee")) {
tracker.registerResult(httpOperation).registerSuccess(true).
registerAggregates(startRecord, TEST_AGGREGATE_COUNT);
}
long latencyRecord = Duration.between(startRecord, Instant.now()).toMillis();
LOG.debug("Spent {} ms in recording latency.", latencyRecord);
return latencyRecord;
});
}
for (Future<Long> fr: executorService.invokeAll(tasks)) {
aggregateLatency += fr.get();
}
double averageRecordLatency = aggregateLatency / numTasks;
assertThat(averageRecordLatency).describedAs("Average time for recording aggregate latencies is bounded")
.isBetween(minLatencyWhenDisabledMs, maxLatencyWhenDisabledMs);
}
@Test
public void verifyGettingLatencyRecordsIsCheapWhenEnabled() throws Exception {
final double maxLatencyWhenDisabledMs = 5000;
final double minLatencyWhenDisabledMs = 0;
final long numTasks = 1000;
long aggregateLatency = 0;
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true);
List<Callable<Long>> tasks = new ArrayList<>();
for (int i = 0; i < numTasks; i++) {
tasks.add(() -> {
Instant startRecord = Instant.now();
abfsPerfTracker.getClientLatency();
long latencyRecord = Duration.between(startRecord, Instant.now()).toMillis();
LOG.debug("Spent {} ms in recording latency.", latencyRecord);
return latencyRecord;
});
}
for (Future<Long> fr: executorService.invokeAll(tasks)) {
aggregateLatency += fr.get();
}
double averageRecordLatency = aggregateLatency / numTasks;
assertThat(averageRecordLatency).describedAs("Average time for getting latency records should be bounded")
.isBetween(minLatencyWhenDisabledMs, maxLatencyWhenDisabledMs);
}
@Test
public void verifyNoExceptionOnInvalidInput() throws Exception {
Instant testInstant = Instant.now();
AbfsPerfTracker abfsPerfTrackerDisabled = new AbfsPerfTracker(accountName, filesystemName, false);
AbfsPerfTracker abfsPerfTrackerEnabled = new AbfsPerfTracker(accountName, filesystemName, true);
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<AbfsHttpHeader>());
verifyNoException(abfsPerfTrackerDisabled);
verifyNoException(abfsPerfTrackerEnabled);
}
private void verifyNoException(AbfsPerfTracker abfsPerfTracker) throws Exception {
Instant testInstant = Instant.now();
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<AbfsHttpHeader>());
try (
AbfsPerfInfo tracker01 = new AbfsPerfInfo(abfsPerfTracker, null, null);
AbfsPerfInfo tracker02 = new AbfsPerfInfo(abfsPerfTracker, "test", null);
AbfsPerfInfo tracker03 = new AbfsPerfInfo(abfsPerfTracker, "test", "test");
AbfsPerfInfo tracker04 = new AbfsPerfInfo(abfsPerfTracker, "test", "test");
AbfsPerfInfo tracker05 = new AbfsPerfInfo(abfsPerfTracker, null, null);
AbfsPerfInfo tracker06 = new AbfsPerfInfo(abfsPerfTracker, "test", null);
AbfsPerfInfo tracker07 = new AbfsPerfInfo(abfsPerfTracker, "test", "test");
AbfsPerfInfo tracker08 = new AbfsPerfInfo(abfsPerfTracker, "test", "test");
AbfsPerfInfo tracker09 = new AbfsPerfInfo(abfsPerfTracker, "test", "test");
AbfsPerfInfo tracker10 = new AbfsPerfInfo(abfsPerfTracker, "test", "test");
AbfsPerfInfo tracker11 = new AbfsPerfInfo(abfsPerfTracker, "test", "test");
AbfsPerfInfo tracker12 = new AbfsPerfInfo(abfsPerfTracker, "test", "test");
AbfsPerfInfo tracker13 = new AbfsPerfInfo(abfsPerfTracker, "test", "test");
) {
tracker01.registerResult(null).registerSuccess(false);
tracker02.registerResult(null).registerSuccess(false);
tracker03.registerResult(null).registerSuccess(false);
tracker04.registerResult(httpOperation).registerSuccess(false);
tracker05.registerResult(null).registerSuccess(false).registerAggregates(null, 0);
tracker06.registerResult(null).registerSuccess(false).registerAggregates(null, 0);
tracker07.registerResult(null).registerSuccess(false).registerAggregates(null, 0);
tracker08.registerResult(httpOperation).registerSuccess(false).registerAggregates(null, 0);
tracker09.registerResult(httpOperation).registerSuccess(false).registerAggregates(Instant.now(), 0);
tracker10.registerResult(httpOperation).registerSuccess(false).registerAggregates(Instant.now(), TEST_AGGREGATE_COUNT);
tracker11.registerResult(httpOperation).registerSuccess(false).registerAggregates(testInstant, TEST_AGGREGATE_COUNT);
tracker12.registerResult(httpOperation).registerSuccess(false).registerAggregates(Instant.MAX, TEST_AGGREGATE_COUNT);
tracker13.registerResult(httpOperation).registerSuccess(false).registerAggregates(Instant.MIN, TEST_AGGREGATE_COUNT);
}
}
}

View File

@ -33,6 +33,11 @@
<value>false</value>
</property>
<property>
<name>fs.azure.abfs.latency.track</name>
<value>false</value>
</property>
<!--==================== ABFS CONFIGURATION ====================-->
<!-- SEE relevant section in "site/markdown/testing_azure.md"-->