HADOOP-16920 ABFS: Make list page size configurable.
Contributed by Bilahari T H. The page limit is set in "fs.azure.list.max.results"; default value is 500. There's currently a limit of 5000 in the store -there are no range checks in the client code so that limit can be changed on the server without any need to update the abfs connector.
This commit is contained in:
parent
367833cf41
commit
6ce5f8734f
|
@ -125,6 +125,11 @@ public class AbfsConfiguration{
|
|||
DefaultValue = MAX_CONCURRENT_WRITE_THREADS)
|
||||
private int maxConcurrentWriteThreads;
|
||||
|
||||
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_LIST_MAX_RESULTS,
|
||||
MinValue = 1,
|
||||
DefaultValue = DEFAULT_AZURE_LIST_MAX_RESULTS)
|
||||
private int listMaxResults;
|
||||
|
||||
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CONCURRENT_CONNECTION_VALUE_IN,
|
||||
MinValue = 1,
|
||||
DefaultValue = MAX_CONCURRENT_READ_THREADS)
|
||||
|
@ -432,6 +437,10 @@ public class AbfsConfiguration{
|
|||
return this.maxConcurrentReadThreads;
|
||||
}
|
||||
|
||||
public int getListMaxResults() {
|
||||
return this.listMaxResults;
|
||||
}
|
||||
|
||||
public boolean getTolerateOobAppends() {
|
||||
return this.tolerateOobAppends;
|
||||
}
|
||||
|
@ -702,6 +711,11 @@ public class AbfsConfiguration{
|
|||
this.disableOutputStreamFlush = disableOutputStreamFlush;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void setListMaxResults(int listMaxResults) {
|
||||
this.listMaxResults = listMaxResults;
|
||||
}
|
||||
|
||||
private String getTrimmedPasswordString(String key, String defaultValue) throws IOException {
|
||||
String value = getPasswordString(key);
|
||||
if (StringUtils.isBlank(value)) {
|
||||
|
|
|
@ -126,7 +126,6 @@ public class AzureBlobFileSystemStore implements Closeable {
|
|||
private static final String DATE_TIME_PATTERN = "E, dd MMM yyyy HH:mm:ss z";
|
||||
private static final String TOKEN_DATE_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSSSSSS'Z'";
|
||||
private static final String XMS_PROPERTIES_ENCODING = "ISO-8859-1";
|
||||
private static final int LIST_MAX_RESULTS = 500;
|
||||
private static final int GET_SET_AGGREGATE_COUNT = 2;
|
||||
|
||||
private final AbfsConfiguration abfsConfiguration;
|
||||
|
@ -682,7 +681,8 @@ public class AzureBlobFileSystemStore implements Closeable {
|
|||
ArrayList<FileStatus> fileStatuses = new ArrayList<>();
|
||||
do {
|
||||
try (AbfsPerfInfo perfInfo = startTracking("listStatus", "listPath")) {
|
||||
AbfsRestOperation op = client.listPath(relativePath, false, LIST_MAX_RESULTS, continuation);
|
||||
AbfsRestOperation op = client.listPath(relativePath, false,
|
||||
abfsConfiguration.getListMaxResults(), continuation);
|
||||
perfInfo.registerResult(op.getResult());
|
||||
continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
|
||||
ListResultSchema retrievedSchema = op.getResult().getListResultSchema();
|
||||
|
|
|
@ -45,6 +45,7 @@ public final class ConfigurationKeys {
|
|||
public static final String AZURE_CONCURRENT_CONNECTION_VALUE_OUT = "fs.azure.concurrentRequestCount.out";
|
||||
public static final String AZURE_CONCURRENT_CONNECTION_VALUE_IN = "fs.azure.concurrentRequestCount.in";
|
||||
public static final String AZURE_TOLERATE_CONCURRENT_APPEND = "fs.azure.io.read.tolerate.concurrent.append";
|
||||
public static final String AZURE_LIST_MAX_RESULTS = "fs.azure.list.max.results";
|
||||
public static final String AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION = "fs.azure.createRemoteFileSystemDuringInitialization";
|
||||
public static final String AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = "fs.azure.skipUserGroupMetadataDuringInitialization";
|
||||
public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling";
|
||||
|
|
|
@ -46,6 +46,7 @@ public final class FileSystemConfigurations {
|
|||
public static final int MAX_BUFFER_SIZE = 100 * ONE_MB; // 100 MB
|
||||
public static final long MAX_AZURE_BLOCK_SIZE = 256 * 1024 * 1024L; // changing default abfs blocksize to 256MB
|
||||
public static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT = "localhost";
|
||||
public static final int DEFAULT_AZURE_LIST_MAX_RESULTS = 500;
|
||||
|
||||
public static final int MAX_CONCURRENT_READ_THREADS = 12;
|
||||
public static final int MAX_CONCURRENT_WRITE_THREADS = 8;
|
||||
|
|
|
@ -18,13 +18,23 @@
|
|||
|
||||
package org.apache.hadoop.fs.azurebfs;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
|
||||
|
@ -38,6 +48,7 @@ import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
|||
*/
|
||||
public final class ITestAbfsClient extends AbstractAbfsIntegrationTest {
|
||||
private static final int LIST_MAX_RESULTS = 500;
|
||||
private static final int LIST_MAX_RESULTS_SERVER = 5000;
|
||||
|
||||
public ITestAbfsClient() throws Exception {
|
||||
super();
|
||||
|
@ -75,4 +86,83 @@ public final class ITestAbfsClient extends AbstractAbfsIntegrationTest {
|
|||
"UnknownHostException: " + fakeAccountName,
|
||||
() -> FileSystem.get(conf.getRawConfiguration()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListPathWithValidListMaxResultsValues()
|
||||
throws IOException, ExecutionException, InterruptedException {
|
||||
final int fileCount = 10;
|
||||
final String directory = "testWithValidListMaxResultsValues";
|
||||
createDirectoryWithNFiles(directory, fileCount);
|
||||
final int[] testData = {fileCount + 100, fileCount + 1, fileCount,
|
||||
fileCount - 1, 1};
|
||||
for (int i = 0; i < testData.length; i++) {
|
||||
int listMaxResults = testData[i];
|
||||
setListMaxResults(listMaxResults);
|
||||
int expectedListResultsSize =
|
||||
listMaxResults > fileCount ? fileCount : listMaxResults;
|
||||
Assertions.assertThat(listPath(directory)).describedAs(
|
||||
"AbfsClient.listPath result should contain %d items when "
|
||||
+ "listMaxResults is %d and directory contains %d items",
|
||||
expectedListResultsSize, listMaxResults, fileCount)
|
||||
.hasSize(expectedListResultsSize);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListPathWithValueGreaterThanServerMaximum()
|
||||
throws IOException, ExecutionException, InterruptedException {
|
||||
setListMaxResults(LIST_MAX_RESULTS_SERVER + 100);
|
||||
final String directory = "testWithValueGreaterThanServerMaximum";
|
||||
createDirectoryWithNFiles(directory, LIST_MAX_RESULTS_SERVER + 200);
|
||||
Assertions.assertThat(listPath(directory)).describedAs(
|
||||
"AbfsClient.listPath result will contain a maximum of %d items "
|
||||
+ "even if listMaxResults >= %d or directory "
|
||||
+ "contains more than %d items", LIST_MAX_RESULTS_SERVER,
|
||||
LIST_MAX_RESULTS_SERVER, LIST_MAX_RESULTS_SERVER)
|
||||
.hasSize(LIST_MAX_RESULTS_SERVER);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListPathWithInvalidListMaxResultsValues() throws Exception {
|
||||
for (int i = -1; i < 1; i++) {
|
||||
setListMaxResults(i);
|
||||
intercept(AbfsRestOperationException.class, "Operation failed: \"One of "
|
||||
+ "the query parameters specified in the request URI is outside" + " "
|
||||
+ "the permissible range.", () -> listPath("directory"));
|
||||
}
|
||||
}
|
||||
|
||||
private List<ListResultEntrySchema> listPath(String directory)
|
||||
throws IOException {
|
||||
return getFileSystem().getAbfsClient()
|
||||
.listPath(directory, false, getListMaxResults(), null).getResult()
|
||||
.getListResultSchema().paths();
|
||||
}
|
||||
|
||||
private int getListMaxResults() throws IOException {
|
||||
return getFileSystem().getAbfsStore().getAbfsConfiguration()
|
||||
.getListMaxResults();
|
||||
}
|
||||
|
||||
private void setListMaxResults(int listMaxResults) throws IOException {
|
||||
getFileSystem().getAbfsStore().getAbfsConfiguration()
|
||||
.setListMaxResults(listMaxResults);
|
||||
}
|
||||
|
||||
private void createDirectoryWithNFiles(String directory, int n)
|
||||
throws ExecutionException, InterruptedException {
|
||||
final List<Future<Void>> tasks = new ArrayList<>();
|
||||
ExecutorService es = Executors.newFixedThreadPool(10);
|
||||
for (int i = 0; i < n; i++) {
|
||||
final Path fileName = new Path("/" + directory + "/test" + i);
|
||||
tasks.add(es.submit(() -> {
|
||||
touch(fileName);
|
||||
return null;
|
||||
}));
|
||||
}
|
||||
for (Future<Void> task : tasks) {
|
||||
task.get();
|
||||
}
|
||||
es.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue