HADOOP-17166. ABFS: configure output stream thread pool (#2179)
Adds the options to control the size of the per-output-stream threadpool when writing data through the abfs connector * fs.azure.write.max.concurrent.requests * fs.azure.write.max.requests.to.queue Contributed by Bilahari T H
This commit is contained in:
parent
773ac799c6
commit
85119267be
|
@ -86,6 +86,14 @@ public class AbfsConfiguration{
|
||||||
DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_IS_HNS_ENABLED)
|
DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_IS_HNS_ENABLED)
|
||||||
private String isNamespaceEnabledAccount;
|
private String isNamespaceEnabledAccount;
|
||||||
|
|
||||||
|
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_MAX_CONCURRENT_REQUESTS,
|
||||||
|
DefaultValue = -1)
|
||||||
|
private int writeMaxConcurrentRequestCount;
|
||||||
|
|
||||||
|
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_MAX_REQUESTS_TO_QUEUE,
|
||||||
|
DefaultValue = -1)
|
||||||
|
private int maxWriteRequestsToQueue;
|
||||||
|
|
||||||
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_BUFFER_SIZE,
|
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_BUFFER_SIZE,
|
||||||
MinValue = MIN_BUFFER_SIZE,
|
MinValue = MIN_BUFFER_SIZE,
|
||||||
MaxValue = MAX_BUFFER_SIZE,
|
MaxValue = MAX_BUFFER_SIZE,
|
||||||
|
@ -822,6 +830,20 @@ public class AbfsConfiguration{
|
||||||
oauthTokenFetchRetryDeltaBackoff);
|
oauthTokenFetchRetryDeltaBackoff);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getWriteMaxConcurrentRequestCount() {
|
||||||
|
if (this.writeMaxConcurrentRequestCount < 1) {
|
||||||
|
return 4 * Runtime.getRuntime().availableProcessors();
|
||||||
|
}
|
||||||
|
return this.writeMaxConcurrentRequestCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMaxWriteRequestsToQueue() {
|
||||||
|
if (this.maxWriteRequestsToQueue < 1) {
|
||||||
|
return 2 * getWriteMaxConcurrentRequestCount();
|
||||||
|
}
|
||||||
|
return this.maxWriteRequestsToQueue;
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
void setReadBufferSize(int bufferSize) {
|
void setReadBufferSize(int bufferSize) {
|
||||||
this.readBufferSize = bufferSize;
|
this.readBufferSize = bufferSize;
|
||||||
|
|
|
@ -490,6 +490,8 @@ public class AzureBlobFileSystemStore implements Closeable {
|
||||||
.disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
|
.disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
|
||||||
.withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
|
.withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
|
||||||
.withAppendBlob(isAppendBlob)
|
.withAppendBlob(isAppendBlob)
|
||||||
|
.withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount())
|
||||||
|
.withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue())
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -52,6 +52,8 @@ public final class ConfigurationKeys {
|
||||||
public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF = "fs.azure.oauth.token.fetch.retry.delta.backoff";
|
public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF = "fs.azure.oauth.token.fetch.retry.delta.backoff";
|
||||||
|
|
||||||
// Read and write buffer sizes defined by the user
|
// Read and write buffer sizes defined by the user
|
||||||
|
public static final String AZURE_WRITE_MAX_CONCURRENT_REQUESTS = "fs.azure.write.max.concurrent.requests";
|
||||||
|
public static final String AZURE_WRITE_MAX_REQUESTS_TO_QUEUE = "fs.azure.write.max.requests.to.queue";
|
||||||
public static final String AZURE_WRITE_BUFFER_SIZE = "fs.azure.write.request.size";
|
public static final String AZURE_WRITE_BUFFER_SIZE = "fs.azure.write.request.size";
|
||||||
public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size";
|
public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size";
|
||||||
public static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size";
|
public static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size";
|
||||||
|
|
|
@ -70,6 +70,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
||||||
private byte[] buffer;
|
private byte[] buffer;
|
||||||
private int bufferIndex;
|
private int bufferIndex;
|
||||||
private final int maxConcurrentRequestCount;
|
private final int maxConcurrentRequestCount;
|
||||||
|
private final int maxRequestsThatCanBeQueued;
|
||||||
|
|
||||||
private ConcurrentLinkedDeque<WriteOperation> writeOperations;
|
private ConcurrentLinkedDeque<WriteOperation> writeOperations;
|
||||||
private final ThreadPoolExecutor threadExecutor;
|
private final ThreadPoolExecutor threadExecutor;
|
||||||
|
@ -119,8 +120,11 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
||||||
if (this.isAppendBlob) {
|
if (this.isAppendBlob) {
|
||||||
this.maxConcurrentRequestCount = 1;
|
this.maxConcurrentRequestCount = 1;
|
||||||
} else {
|
} else {
|
||||||
this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
|
this.maxConcurrentRequestCount = abfsOutputStreamContext
|
||||||
|
.getWriteMaxConcurrentRequestCount();
|
||||||
}
|
}
|
||||||
|
this.maxRequestsThatCanBeQueued = abfsOutputStreamContext
|
||||||
|
.getMaxWriteRequestsToQueue();
|
||||||
this.threadExecutor
|
this.threadExecutor
|
||||||
= new ThreadPoolExecutor(maxConcurrentRequestCount,
|
= new ThreadPoolExecutor(maxConcurrentRequestCount,
|
||||||
maxConcurrentRequestCount,
|
maxConcurrentRequestCount,
|
||||||
|
@ -371,7 +375,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
||||||
final long offset = position;
|
final long offset = position;
|
||||||
position += bytesLength;
|
position += bytesLength;
|
||||||
|
|
||||||
if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
|
if (threadExecutor.getQueue().size() >= maxRequestsThatCanBeQueued) {
|
||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
waitForTaskToComplete();
|
waitForTaskToComplete();
|
||||||
outputStreamStatistics.timeSpentTaskWait(start, System.currentTimeMillis());
|
outputStreamStatistics.timeSpentTaskWait(start, System.currentTimeMillis());
|
||||||
|
@ -543,6 +547,16 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
||||||
return writeOperations.size();
|
return writeOperations.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
int getMaxConcurrentRequestCount() {
|
||||||
|
return this.maxConcurrentRequestCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
int getMaxRequestsThatCanBeQueued() {
|
||||||
|
return maxRequestsThatCanBeQueued;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Appending AbfsOutputStream statistics to base toString().
|
* Appending AbfsOutputStream statistics to base toString().
|
||||||
*
|
*
|
||||||
|
|
|
@ -33,6 +33,10 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
|
||||||
|
|
||||||
private boolean isAppendBlob;
|
private boolean isAppendBlob;
|
||||||
|
|
||||||
|
private int writeMaxConcurrentRequestCount;
|
||||||
|
|
||||||
|
private int maxWriteRequestsToQueue;
|
||||||
|
|
||||||
public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
|
public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
|
||||||
super(sasTokenRenewPeriodForStreamsInSeconds);
|
super(sasTokenRenewPeriodForStreamsInSeconds);
|
||||||
}
|
}
|
||||||
|
@ -71,6 +75,18 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public AbfsOutputStreamContext withWriteMaxConcurrentRequestCount(
|
||||||
|
final int writeMaxConcurrentRequestCount) {
|
||||||
|
this.writeMaxConcurrentRequestCount = writeMaxConcurrentRequestCount;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AbfsOutputStreamContext withMaxWriteRequestsToQueue(
|
||||||
|
final int maxWriteRequestsToQueue) {
|
||||||
|
this.maxWriteRequestsToQueue = maxWriteRequestsToQueue;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public int getWriteBufferSize() {
|
public int getWriteBufferSize() {
|
||||||
return writeBufferSize;
|
return writeBufferSize;
|
||||||
}
|
}
|
||||||
|
@ -90,4 +106,12 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
|
||||||
public boolean isAppendBlob() {
|
public boolean isAppendBlob() {
|
||||||
return isAppendBlob;
|
return isAppendBlob;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getWriteMaxConcurrentRequestCount() {
|
||||||
|
return this.writeMaxConcurrentRequestCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMaxWriteRequestsToQueue() {
|
||||||
|
return this.maxWriteRequestsToQueue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -796,6 +796,19 @@ will be -1. To disable readaheads, set this value to 0. If your workload is
|
||||||
doing only random reads (non-sequential) or you are seeing throttling, you
|
doing only random reads (non-sequential) or you are seeing throttling, you
|
||||||
may try setting this value to 0.
|
may try setting this value to 0.
|
||||||
|
|
||||||
|
To run under limited memory situations configure the following. Especially
|
||||||
|
when there are too many writes from the same process.
|
||||||
|
|
||||||
|
`fs.azure.write.max.concurrent.requests`: To set the maximum concurrent
|
||||||
|
write requests from an AbfsOutputStream instance to server at any point of
|
||||||
|
time. Effectively this will be the threadpool size within the
|
||||||
|
AbfsOutputStream instance. Set the value in between 1 to 8 both inclusive.
|
||||||
|
|
||||||
|
`fs.azure.write.max.requests.to.queue`: To set the maximum write requests
|
||||||
|
that can be queued. Memory consumption of AbfsOutputStream instance can be
|
||||||
|
tuned with this config considering each queued request holds a buffer. Set
|
||||||
|
the value 3 or 4 times the value set for s.azure.write.max.concurrent.requests.
|
||||||
|
|
||||||
### <a name="securityconfigoptions"></a> Security Options
|
### <a name="securityconfigoptions"></a> Security Options
|
||||||
`fs.azure.always.use.https`: Enforces to use HTTPS instead of HTTP when the flag
|
`fs.azure.always.use.https`: Enforces to use HTTPS instead of HTTP when the flag
|
||||||
is made true. Irrespective of the flag, AbfsClient will use HTTPS if the secure
|
is made true. Irrespective of the flag, AbfsClient will use HTTPS if the secure
|
||||||
|
|
|
@ -0,0 +1,78 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.azurebfs.services;
|
||||||
|
|
||||||
|
import org.assertj.core.api.Assertions;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test create operation.
|
||||||
|
*/
|
||||||
|
public class ITestAbfsOutputStream extends AbstractAbfsIntegrationTest {
|
||||||
|
private static final Path TEST_FILE_PATH = new Path("testfile");
|
||||||
|
|
||||||
|
public ITestAbfsOutputStream() throws Exception {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMaxRequestsAndQueueCapacityDefaults() throws Exception {
|
||||||
|
Configuration conf = getRawConfiguration();
|
||||||
|
final AzureBlobFileSystem fs = getFileSystem(conf);
|
||||||
|
try (FSDataOutputStream out = fs.create(TEST_FILE_PATH)) {
|
||||||
|
AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream();
|
||||||
|
Assertions.assertThat(stream.getMaxConcurrentRequestCount()).describedAs(
|
||||||
|
"maxConcurrentRequests should be " + getConfiguration()
|
||||||
|
.getWriteMaxConcurrentRequestCount())
|
||||||
|
.isEqualTo(getConfiguration().getWriteMaxConcurrentRequestCount());
|
||||||
|
Assertions.assertThat(stream.getMaxRequestsThatCanBeQueued()).describedAs(
|
||||||
|
"maxRequestsToQueue should be " + getConfiguration()
|
||||||
|
.getMaxWriteRequestsToQueue())
|
||||||
|
.isEqualTo(getConfiguration().getMaxWriteRequestsToQueue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMaxRequestsAndQueueCapacity() throws Exception {
|
||||||
|
Configuration conf = getRawConfiguration();
|
||||||
|
int maxConcurrentRequests = 6;
|
||||||
|
int maxRequestsToQueue = 10;
|
||||||
|
conf.set(ConfigurationKeys.AZURE_WRITE_MAX_CONCURRENT_REQUESTS,
|
||||||
|
"" + maxConcurrentRequests);
|
||||||
|
conf.set(ConfigurationKeys.AZURE_WRITE_MAX_REQUESTS_TO_QUEUE,
|
||||||
|
"" + maxRequestsToQueue);
|
||||||
|
final AzureBlobFileSystem fs = getFileSystem(conf);
|
||||||
|
FSDataOutputStream out = fs.create(TEST_FILE_PATH);
|
||||||
|
AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream();
|
||||||
|
Assertions.assertThat(stream.getMaxConcurrentRequestCount())
|
||||||
|
.describedAs("maxConcurrentRequests should be " + maxConcurrentRequests)
|
||||||
|
.isEqualTo(maxConcurrentRequests);
|
||||||
|
Assertions.assertThat(stream.getMaxRequestsThatCanBeQueued())
|
||||||
|
.describedAs("maxRequestsToQueue should be " + maxRequestsToQueue)
|
||||||
|
.isEqualTo(maxRequestsToQueue);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.fs.azurebfs.services;
|
package org.apache.hadoop.fs.azurebfs.services;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
@ -54,13 +55,17 @@ public final class TestAbfsOutputStream {
|
||||||
private AbfsOutputStreamContext populateAbfsOutputStreamContext(int writeBufferSize,
|
private AbfsOutputStreamContext populateAbfsOutputStreamContext(int writeBufferSize,
|
||||||
boolean isFlushEnabled,
|
boolean isFlushEnabled,
|
||||||
boolean disableOutputStreamFlush,
|
boolean disableOutputStreamFlush,
|
||||||
boolean isAppendBlob) {
|
boolean isAppendBlob) throws IOException, IllegalAccessException {
|
||||||
|
AbfsConfiguration abfsConf = new AbfsConfiguration(new Configuration(),
|
||||||
|
accountName1);
|
||||||
return new AbfsOutputStreamContext(2)
|
return new AbfsOutputStreamContext(2)
|
||||||
.withWriteBufferSize(writeBufferSize)
|
.withWriteBufferSize(writeBufferSize)
|
||||||
.enableFlush(isFlushEnabled)
|
.enableFlush(isFlushEnabled)
|
||||||
.disableOutputStreamFlush(disableOutputStreamFlush)
|
.disableOutputStreamFlush(disableOutputStreamFlush)
|
||||||
.withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
|
.withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
|
||||||
.withAppendBlob(isAppendBlob)
|
.withAppendBlob(isAppendBlob)
|
||||||
|
.withWriteMaxConcurrentRequestCount(abfsConf.getWriteMaxConcurrentRequestCount())
|
||||||
|
.withMaxWriteRequestsToQueue(abfsConf.getMaxWriteRequestsToQueue())
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue