From 85119267be75d7960e2880d251ccaf3bda4a87d9 Mon Sep 17 00:00:00 2001 From: bilaharith <52483117+bilaharith@users.noreply.github.com> Date: Wed, 9 Sep 2020 21:11:36 +0530 Subject: [PATCH] HADOOP-17166. ABFS: configure output stream thread pool (#2179) Adds the options to control the size of the per-output-stream threadpool when writing data through the abfs connector * fs.azure.write.max.concurrent.requests * fs.azure.write.max.requests.to.queue Contributed by Bilahari T H --- .../hadoop/fs/azurebfs/AbfsConfiguration.java | 22 ++++++ .../fs/azurebfs/AzureBlobFileSystemStore.java | 2 + .../azurebfs/constants/ConfigurationKeys.java | 2 + .../azurebfs/services/AbfsOutputStream.java | 18 ++++- .../services/AbfsOutputStreamContext.java | 24 ++++++ .../hadoop-azure/src/site/markdown/abfs.md | 13 ++++ .../services/ITestAbfsOutputStream.java | 78 +++++++++++++++++++ .../services/TestAbfsOutputStream.java | 7 +- 8 files changed, 163 insertions(+), 3 deletions(-) create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 85bd37a7702..66d485317c9 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -86,6 +86,14 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_IS_HNS_ENABLED) private String isNamespaceEnabledAccount; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_MAX_CONCURRENT_REQUESTS, + DefaultValue = -1) + private int writeMaxConcurrentRequestCount; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_MAX_REQUESTS_TO_QUEUE, + DefaultValue = -1) + private int maxWriteRequestsToQueue; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_BUFFER_SIZE, MinValue = MIN_BUFFER_SIZE, MaxValue = MAX_BUFFER_SIZE, @@ -822,6 +830,20 @@ public class AbfsConfiguration{ oauthTokenFetchRetryDeltaBackoff); } + public int getWriteMaxConcurrentRequestCount() { + if (this.writeMaxConcurrentRequestCount < 1) { + return 4 * Runtime.getRuntime().availableProcessors(); + } + return this.writeMaxConcurrentRequestCount; + } + + public int getMaxWriteRequestsToQueue() { + if (this.maxWriteRequestsToQueue < 1) { + return 2 * getWriteMaxConcurrentRequestCount(); + } + return this.maxWriteRequestsToQueue; + } + @VisibleForTesting void setReadBufferSize(int bufferSize) { this.readBufferSize = bufferSize; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 9861e3a7721..23d2b5a3d63 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -490,6 +490,8 @@ public class AzureBlobFileSystemStore implements Closeable { .disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled()) .withStreamStatistics(new AbfsOutputStreamStatisticsImpl()) .withAppendBlob(isAppendBlob) + .withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount()) + .withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue()) .build(); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 5f1ad31e487..681390c0198 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -52,6 +52,8 @@ public final class ConfigurationKeys { public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF = "fs.azure.oauth.token.fetch.retry.delta.backoff"; // Read and write buffer sizes defined by the user + public static final String AZURE_WRITE_MAX_CONCURRENT_REQUESTS = "fs.azure.write.max.concurrent.requests"; + public static final String AZURE_WRITE_MAX_REQUESTS_TO_QUEUE = "fs.azure.write.max.requests.to.queue"; public static final String AZURE_WRITE_BUFFER_SIZE = "fs.azure.write.request.size"; public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size"; public static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 6c1e177da61..1991638a667 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -70,6 +70,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa private byte[] buffer; private int bufferIndex; private final int maxConcurrentRequestCount; + private final int maxRequestsThatCanBeQueued; private ConcurrentLinkedDeque writeOperations; private final ThreadPoolExecutor threadExecutor; @@ -119,8 +120,11 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa if (this.isAppendBlob) { this.maxConcurrentRequestCount = 1; } else { - this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); + this.maxConcurrentRequestCount = abfsOutputStreamContext + .getWriteMaxConcurrentRequestCount(); } + this.maxRequestsThatCanBeQueued = abfsOutputStreamContext + .getMaxWriteRequestsToQueue(); this.threadExecutor = new ThreadPoolExecutor(maxConcurrentRequestCount, maxConcurrentRequestCount, @@ -371,7 +375,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa final long offset = position; position += bytesLength; - if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) { + if (threadExecutor.getQueue().size() >= maxRequestsThatCanBeQueued) { long start = System.currentTimeMillis(); waitForTaskToComplete(); outputStreamStatistics.timeSpentTaskWait(start, System.currentTimeMillis()); @@ -543,6 +547,16 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa return writeOperations.size(); } + @VisibleForTesting + int getMaxConcurrentRequestCount() { + return this.maxConcurrentRequestCount; + } + + @VisibleForTesting + int getMaxRequestsThatCanBeQueued() { + return maxRequestsThatCanBeQueued; + } + /** * Appending AbfsOutputStream statistics to base toString(). * diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java index 03e4abaf4f6..2dce5dc2c77 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java @@ -33,6 +33,10 @@ public class AbfsOutputStreamContext extends AbfsStreamContext { private boolean isAppendBlob; + private int writeMaxConcurrentRequestCount; + + private int maxWriteRequestsToQueue; + public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) { super(sasTokenRenewPeriodForStreamsInSeconds); } @@ -71,6 +75,18 @@ public class AbfsOutputStreamContext extends AbfsStreamContext { return this; } + public AbfsOutputStreamContext withWriteMaxConcurrentRequestCount( + final int writeMaxConcurrentRequestCount) { + this.writeMaxConcurrentRequestCount = writeMaxConcurrentRequestCount; + return this; + } + + public AbfsOutputStreamContext withMaxWriteRequestsToQueue( + final int maxWriteRequestsToQueue) { + this.maxWriteRequestsToQueue = maxWriteRequestsToQueue; + return this; + } + public int getWriteBufferSize() { return writeBufferSize; } @@ -90,4 +106,12 @@ public class AbfsOutputStreamContext extends AbfsStreamContext { public boolean isAppendBlob() { return isAppendBlob; } + + public int getWriteMaxConcurrentRequestCount() { + return this.writeMaxConcurrentRequestCount; + } + + public int getMaxWriteRequestsToQueue() { + return this.maxWriteRequestsToQueue; + } } diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md index 4640bab2c12..79b897b6bd2 100644 --- a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md +++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md @@ -796,6 +796,19 @@ will be -1. To disable readaheads, set this value to 0. If your workload is doing only random reads (non-sequential) or you are seeing throttling, you may try setting this value to 0. +To run under limited memory situations configure the following. Especially +when there are too many writes from the same process. + +`fs.azure.write.max.concurrent.requests`: To set the maximum concurrent + write requests from an AbfsOutputStream instance to server at any point of + time. Effectively this will be the threadpool size within the + AbfsOutputStream instance. Set the value in between 1 to 8 both inclusive. + +`fs.azure.write.max.requests.to.queue`: To set the maximum write requests + that can be queued. Memory consumption of AbfsOutputStream instance can be + tuned with this config considering each queued request holds a buffer. Set + the value 3 or 4 times the value set for s.azure.write.max.concurrent.requests. + ### Security Options `fs.azure.always.use.https`: Enforces to use HTTPS instead of HTTP when the flag is made true. Irrespective of the flag, AbfsClient will use HTTPS if the secure diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java new file mode 100644 index 00000000000..7f9111683d5 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import org.assertj.core.api.Assertions; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; +import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; + +/** + * Test create operation. + */ +public class ITestAbfsOutputStream extends AbstractAbfsIntegrationTest { + private static final Path TEST_FILE_PATH = new Path("testfile"); + + public ITestAbfsOutputStream() throws Exception { + super(); + } + + @Test + public void testMaxRequestsAndQueueCapacityDefaults() throws Exception { + Configuration conf = getRawConfiguration(); + final AzureBlobFileSystem fs = getFileSystem(conf); + try (FSDataOutputStream out = fs.create(TEST_FILE_PATH)) { + AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream(); + Assertions.assertThat(stream.getMaxConcurrentRequestCount()).describedAs( + "maxConcurrentRequests should be " + getConfiguration() + .getWriteMaxConcurrentRequestCount()) + .isEqualTo(getConfiguration().getWriteMaxConcurrentRequestCount()); + Assertions.assertThat(stream.getMaxRequestsThatCanBeQueued()).describedAs( + "maxRequestsToQueue should be " + getConfiguration() + .getMaxWriteRequestsToQueue()) + .isEqualTo(getConfiguration().getMaxWriteRequestsToQueue()); + } + } + + @Test + public void testMaxRequestsAndQueueCapacity() throws Exception { + Configuration conf = getRawConfiguration(); + int maxConcurrentRequests = 6; + int maxRequestsToQueue = 10; + conf.set(ConfigurationKeys.AZURE_WRITE_MAX_CONCURRENT_REQUESTS, + "" + maxConcurrentRequests); + conf.set(ConfigurationKeys.AZURE_WRITE_MAX_REQUESTS_TO_QUEUE, + "" + maxRequestsToQueue); + final AzureBlobFileSystem fs = getFileSystem(conf); + FSDataOutputStream out = fs.create(TEST_FILE_PATH); + AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream(); + Assertions.assertThat(stream.getMaxConcurrentRequestCount()) + .describedAs("maxConcurrentRequests should be " + maxConcurrentRequests) + .isEqualTo(maxConcurrentRequests); + Assertions.assertThat(stream.getMaxRequestsThatCanBeQueued()) + .describedAs("maxRequestsToQueue should be " + maxRequestsToQueue) + .isEqualTo(maxRequestsToQueue); + } + +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java index 4105aa18f21..aab0248c407 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.azurebfs.services; +import java.io.IOException; import java.util.Arrays; import java.util.HashSet; import java.util.Random; @@ -54,13 +55,17 @@ public final class TestAbfsOutputStream { private AbfsOutputStreamContext populateAbfsOutputStreamContext(int writeBufferSize, boolean isFlushEnabled, boolean disableOutputStreamFlush, - boolean isAppendBlob) { + boolean isAppendBlob) throws IOException, IllegalAccessException { + AbfsConfiguration abfsConf = new AbfsConfiguration(new Configuration(), + accountName1); return new AbfsOutputStreamContext(2) .withWriteBufferSize(writeBufferSize) .enableFlush(isFlushEnabled) .disableOutputStreamFlush(disableOutputStreamFlush) .withStreamStatistics(new AbfsOutputStreamStatisticsImpl()) .withAppendBlob(isAppendBlob) + .withWriteMaxConcurrentRequestCount(abfsConf.getWriteMaxConcurrentRequestCount()) + .withMaxWriteRequestsToQueue(abfsConf.getMaxWriteRequestsToQueue()) .build(); }