HADOOP-15778. ABFS: Fix client side throttling for read.
Contributed by Sneha Varma.
This commit is contained in:
parent
a5692c2da5
commit
d0b4624c88
|
@ -19,9 +19,12 @@
|
||||||
package org.apache.hadoop.fs.azurebfs.services;
|
package org.apache.hadoop.fs.azurebfs.services;
|
||||||
|
|
||||||
import java.net.HttpURLConnection;
|
import java.net.HttpURLConnection;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Throttles Azure Blob File System read and write operations to achieve maximum
|
* Throttles Azure Blob File System read and write operations to achieve maximum
|
||||||
* throughput by minimizing errors. The errors occur when the account ingress
|
* throughput by minimizing errors. The errors occur when the account ingress
|
||||||
|
@ -37,6 +40,7 @@ import org.slf4j.LoggerFactory;
|
||||||
public final class AbfsClientThrottlingIntercept {
|
public final class AbfsClientThrottlingIntercept {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(
|
private static final Logger LOG = LoggerFactory.getLogger(
|
||||||
AbfsClientThrottlingIntercept.class);
|
AbfsClientThrottlingIntercept.class);
|
||||||
|
private static final String RANGE_PREFIX = "bytes=";
|
||||||
private static AbfsClientThrottlingIntercept singleton = null;
|
private static AbfsClientThrottlingIntercept singleton = null;
|
||||||
private AbfsClientThrottlingAnalyzer readThrottler = null;
|
private AbfsClientThrottlingAnalyzer readThrottler = null;
|
||||||
private AbfsClientThrottlingAnalyzer writeThrottler = null;
|
private AbfsClientThrottlingAnalyzer writeThrottler = null;
|
||||||
|
@ -82,7 +86,8 @@ public final class AbfsClientThrottlingIntercept {
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case ReadFile:
|
case ReadFile:
|
||||||
contentLength = abfsHttpOperation.getBytesReceived();
|
String range = abfsHttpOperation.getConnection().getRequestProperty(HttpHeaderConfigurations.RANGE);
|
||||||
|
contentLength = getContentLengthIfKnown(range);
|
||||||
if (contentLength > 0) {
|
if (contentLength > 0) {
|
||||||
singleton.readThrottler.addBytesTransferred(contentLength,
|
singleton.readThrottler.addBytesTransferred(contentLength,
|
||||||
isFailedOperation);
|
isFailedOperation);
|
||||||
|
@ -114,4 +119,17 @@ public final class AbfsClientThrottlingIntercept {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
private static long getContentLengthIfKnown(String range) {
|
||||||
|
long contentLength = 0;
|
||||||
|
// Format is "bytes=%d-%d"
|
||||||
|
if (range != null && range.startsWith(RANGE_PREFIX)) {
|
||||||
|
String[] offsets = range.substring(RANGE_PREFIX.length()).split("-");
|
||||||
|
if (offsets.length == 2) {
|
||||||
|
contentLength = Long.parseLong(offsets[1]) - Long.parseLong(offsets[0])
|
||||||
|
+ 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return contentLength;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -156,9 +156,10 @@ public class AbfsRestOperation {
|
||||||
client.getAccessToken());
|
client.getAccessToken());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
AbfsClientThrottlingIntercept.sendingRequest(operationType);
|
||||||
|
|
||||||
if (hasRequestBody) {
|
if (hasRequestBody) {
|
||||||
// HttpUrlConnection requires
|
// HttpUrlConnection requires
|
||||||
AbfsClientThrottlingIntercept.sendingRequest(operationType);
|
|
||||||
httpOperation.sendRequest(buffer, bufferOffset, bufferLength);
|
httpOperation.sendRequest(buffer, bufferOffset, bufferLength);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue