fileStatuses = new ArrayList<>();
do {
try (AbfsPerfInfo perfInfo = startTracking("listStatus", "listPath")) {
AbfsRestOperation op = client.listPath(relativePath, false,
@@ -928,7 +940,8 @@ public class AzureBlobFileSystemStore implements Closeable {
perfInfo.registerSuccess(true);
countAggregate++;
- shouldContinue = continuation != null && !continuation.isEmpty();
+ shouldContinue =
+ fetchAll && continuation != null && !continuation.isEmpty();
if (!shouldContinue) {
perfInfo.registerAggregates(startAggregate, countAggregate);
@@ -936,7 +949,7 @@ public class AzureBlobFileSystemStore implements Closeable {
}
} while (shouldContinue);
- return fileStatuses.toArray(new FileStatus[fileStatuses.size()]);
+ return continuation;
}
// generate continuation token for xns account
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index cdef9c9b7ac..8a9c63ddbe8 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -130,6 +130,8 @@ public final class ConfigurationKeys {
public static final String FS_AZURE_SKIP_SUPER_USER_REPLACEMENT = "fs.azure.identity.transformer.skip.superuser.replacement";
public static final String AZURE_KEY_ACCOUNT_KEYPROVIDER = "fs.azure.account.keyprovider";
public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script";
+ /** Setting this true will make the driver use it's own RemoteIterator implementation */
+ public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR = "fs.azure.enable.abfslistiterator";
/** End point of ABFS account: {@value}. */
public static final String AZURE_ABFS_ENDPOINT = "fs.azure.abfs.endpoint";
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index a23dfd5292b..9b760c472a9 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -101,5 +101,7 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_DELETE_CONSIDERED_IDEMPOTENT = true;
public static final int DEFAULT_CLOCK_SKEW_WITH_SERVER_IN_MS = 5 * 60 * 1000; // 5 mins
+ public static final boolean DEFAULT_ENABLE_ABFS_LIST_ITERATOR = true;
+
private FileSystemConfigurations() {}
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
new file mode 100644
index 00000000000..0c664fc2fbb
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import javax.activation.UnsupportedDataTypeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.RemoteIterator;
+
+public class AbfsListStatusRemoteIterator
+ implements RemoteIterator {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(AbfsListStatusRemoteIterator.class);
+
+ private static final boolean FETCH_ALL_FALSE = false;
+ private static final int MAX_QUEUE_SIZE = 10;
+ private static final long POLL_WAIT_TIME_IN_MS = 250;
+
+ private final FileStatus fileStatus;
+ private final ListingSupport listingSupport;
+ private final ArrayBlockingQueue iteratorsQueue;
+
+ private volatile boolean isAsyncInProgress = false;
+ private boolean isIterationComplete = false;
+ private String continuation;
+ private Iterator currIterator;
+
+ public AbfsListStatusRemoteIterator(final FileStatus fileStatus,
+ final ListingSupport listingSupport) {
+ this.fileStatus = fileStatus;
+ this.listingSupport = listingSupport;
+ iteratorsQueue = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE);
+ currIterator = Collections.emptyIterator();
+ fetchBatchesAsync();
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ if (currIterator.hasNext()) {
+ return true;
+ }
+ currIterator = getNextIterator();
+ return currIterator.hasNext();
+ }
+
+ @Override
+ public FileStatus next() throws IOException {
+ if (!this.hasNext()) {
+ throw new NoSuchElementException();
+ }
+ return currIterator.next();
+ }
+
+ private Iterator getNextIterator() throws IOException {
+ fetchBatchesAsync();
+ try {
+ Object obj = null;
+ while (obj == null
+ && (!isIterationComplete || !iteratorsQueue.isEmpty())) {
+ obj = iteratorsQueue.poll(POLL_WAIT_TIME_IN_MS, TimeUnit.MILLISECONDS);
+ }
+ if (obj == null) {
+ return Collections.emptyIterator();
+ } else if (obj instanceof Iterator) {
+ return (Iterator) obj;
+ } else if (obj instanceof IOException) {
+ throw (IOException) obj;
+ } else {
+ throw new UnsupportedDataTypeException();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.error("Thread got interrupted: {}", e);
+ throw new IOException(e);
+ }
+ }
+
+ private void fetchBatchesAsync() {
+ if (isAsyncInProgress || isIterationComplete) {
+ return;
+ }
+ synchronized (this) {
+ if (isAsyncInProgress || isIterationComplete) {
+ return;
+ }
+ isAsyncInProgress = true;
+ }
+ CompletableFuture.runAsync(() -> asyncOp());
+ }
+
+ private void asyncOp() {
+ try {
+ while (!isIterationComplete && iteratorsQueue.size() <= MAX_QUEUE_SIZE) {
+ addNextBatchIteratorToQueue();
+ }
+ } catch (IOException ioe) {
+ LOG.error("Fetching filestatuses failed", ioe);
+ try {
+ iteratorsQueue.put(ioe);
+ } catch (InterruptedException interruptedException) {
+ Thread.currentThread().interrupt();
+ LOG.error("Thread got interrupted: {}", interruptedException);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.error("Thread got interrupted: {}", e);
+ } finally {
+ synchronized (this) {
+ isAsyncInProgress = false;
+ }
+ }
+ }
+
+ private void addNextBatchIteratorToQueue()
+ throws IOException, InterruptedException {
+ List fileStatuses = new ArrayList<>();
+ continuation = listingSupport
+ .listStatus(fileStatus.getPath(), null, fileStatuses, FETCH_ALL_FALSE,
+ continuation);
+ if (!fileStatuses.isEmpty()) {
+ iteratorsQueue.put(fileStatuses.iterator());
+ }
+ synchronized (this) {
+ if (continuation == null || continuation.isEmpty()) {
+ isIterationComplete = true;
+ }
+ }
+ }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java
new file mode 100644
index 00000000000..4c449409aaf
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface ListingSupport {
+
+ /**
+ * @param path The list path.
+ * @return the entries in the path.
+ * @throws IOException in case of error
+ */
+ FileStatus[] listStatus(Path path) throws IOException;
+
+ /**
+ * @param path Path the list path.
+ * @param startFrom The entry name that list results should start with.
+ * For example, if folder "/folder" contains four
+ * files: "afile", "bfile", "hfile", "ifile". Then
+ * listStatus(Path("/folder"), "hfile") will return
+ * "/folder/hfile" and "folder/ifile" Notice that if
+ * startFrom is a non-existent entry name, then the
+ * list response contains all entries after this
+ * non-existent entry in lexical order: listStatus
+ * (Path("/folder"), "cfile") will return
+ * "/folder/hfile" and "/folder/ifile".
+ * @return the entries in the path start from "startFrom" in lexical order.
+ * @throws IOException in case of error
+ */
+ FileStatus[] listStatus(Path path, String startFrom) throws IOException;
+
+ /**
+ * @param path The list path
+ * @param startFrom The entry name that list results should start with.
+ * For example, if folder "/folder" contains four
+ * files: "afile", "bfile", "hfile", "ifile". Then
+ * listStatus(Path("/folder"), "hfile") will return
+ * "/folder/hfile" and "folder/ifile" Notice that if
+ * startFrom is a non-existent entry name, then the
+ * list response contains all entries after this
+ * non-existent entry in lexical order: listStatus
+ * (Path("/folder"), "cfile") will return
+ * "/folder/hfile" and "/folder/ifile".
+ * @param fileStatuses This list has to be filled with the FileStatus objects
+ * @param fetchAll flag to indicate if the above list needs to be
+ * filled with just one page os results or the entire
+ * result.
+ * @param continuation Contiuation token. null means start rom the begining.
+ * @return Continuation tokem
+ * @throws IOException in case of error
+ */
+ String listStatus(Path path, String startFrom, List fileStatuses,
+ boolean fetchAll, String continuation) throws IOException;
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java
new file mode 100644
index 00000000000..6d5e4cf3bce
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator;
+import org.apache.hadoop.fs.azurebfs.services.ListingSupport;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Test ListStatusRemoteIterator operation.
+ */
+public class ITestAbfsListStatusRemoteIterator extends AbstractAbfsIntegrationTest {
+
+ private static final int TEST_FILES_NUMBER = 1000;
+
+ public ITestAbfsListStatusRemoteIterator() throws Exception {
+ }
+
+ @Test
+ public void testAbfsIteratorWithHasNext() throws Exception {
+ Path testDir = createTestDirectory();
+ setPageSize(10);
+ final List fileNames = createFilesUnderDirectory(TEST_FILES_NUMBER,
+ testDir, "testListPath");
+
+ ListingSupport listngSupport = Mockito.spy(getFileSystem().getAbfsStore());
+ RemoteIterator fsItr = new AbfsListStatusRemoteIterator(
+ getFileSystem().getFileStatus(testDir), listngSupport);
+ Assertions.assertThat(fsItr)
+ .describedAs("RemoteIterator should be instance of "
+ + "AbfsListStatusRemoteIterator by default")
+ .isInstanceOf(AbfsListStatusRemoteIterator.class);
+ int itrCount = 0;
+ while (fsItr.hasNext()) {
+ FileStatus fileStatus = fsItr.next();
+ String pathStr = fileStatus.getPath().toString();
+ fileNames.remove(pathStr);
+ itrCount++;
+ }
+ Assertions.assertThat(itrCount)
+ .describedAs("Number of iterations should be equal to the files "
+ + "created")
+ .isEqualTo(TEST_FILES_NUMBER);
+ Assertions.assertThat(fileNames.size())
+ .describedAs("After removing every iterm found from the iterator, "
+ + "there should be no more elements in the fileNames")
+ .isEqualTo(0);
+ int minNumberOfInvokations = TEST_FILES_NUMBER / 10;
+ verify(listngSupport, Mockito.atLeast(minNumberOfInvokations))
+ .listStatus(any(Path.class), nullable(String.class),
+ anyList(), anyBoolean(),
+ nullable(String.class));
+ }
+
+ @Test
+ public void testAbfsIteratorWithoutHasNext() throws Exception {
+ Path testDir = createTestDirectory();
+ setPageSize(10);
+ final List fileNames = createFilesUnderDirectory(TEST_FILES_NUMBER,
+ testDir, "testListPath");
+
+ ListingSupport listngSupport = Mockito.spy(getFileSystem().getAbfsStore());
+ RemoteIterator fsItr = new AbfsListStatusRemoteIterator(
+ getFileSystem().getFileStatus(testDir), listngSupport);
+ Assertions.assertThat(fsItr)
+ .describedAs("RemoteIterator should be instance of "
+ + "AbfsListStatusRemoteIterator by default")
+ .isInstanceOf(AbfsListStatusRemoteIterator.class);
+ int itrCount = 0;
+ for (int i = 0; i < TEST_FILES_NUMBER; i++) {
+ FileStatus fileStatus = fsItr.next();
+ String pathStr = fileStatus.getPath().toString();
+ fileNames.remove(pathStr);
+ itrCount++;
+ }
+ Assertions.assertThatThrownBy(() -> fsItr.next())
+ .describedAs(
+ "next() should throw NoSuchElementException since next has been "
+ + "called " + TEST_FILES_NUMBER + " times")
+ .isInstanceOf(NoSuchElementException.class);
+ Assertions.assertThat(itrCount)
+ .describedAs("Number of iterations should be equal to the files "
+ + "created")
+ .isEqualTo(TEST_FILES_NUMBER);
+ Assertions.assertThat(fileNames.size())
+ .describedAs("After removing every iterm found from the iterator, "
+ + "there should be no more elements in the fileNames")
+ .isEqualTo(0);
+ int minNumberOfInvokations = TEST_FILES_NUMBER / 10;
+ verify(listngSupport, Mockito.atLeast(minNumberOfInvokations))
+ .listStatus(any(Path.class), nullable(String.class),
+ anyList(), anyBoolean(),
+ nullable(String.class));
+ }
+
+ @Test
+ public void testWithAbfsIteratorDisabled() throws Exception {
+ Path testDir = createTestDirectory();
+ setPageSize(10);
+ setEnableAbfsIterator(false);
+ final List fileNames = createFilesUnderDirectory(TEST_FILES_NUMBER,
+ testDir, "testListPath");
+
+ RemoteIterator fsItr =
+ getFileSystem().listStatusIterator(testDir);
+ Assertions.assertThat(fsItr)
+ .describedAs("RemoteIterator should not be instance of "
+ + "AbfsListStatusRemoteIterator when it is disabled")
+ .isNotInstanceOf(AbfsListStatusRemoteIterator.class);
+ int itrCount = 0;
+ while (fsItr.hasNext()) {
+ FileStatus fileStatus = fsItr.next();
+ String pathStr = fileStatus.getPath().toString();
+ fileNames.remove(pathStr);
+ itrCount++;
+ }
+ Assertions.assertThat(itrCount)
+ .describedAs("Number of iterations should be equal to the files "
+ + "created")
+ .isEqualTo(TEST_FILES_NUMBER);
+ Assertions.assertThat(fileNames.size())
+ .describedAs("After removing every iterm found from the iterator, "
+ + "there should be no more elements in the fileNames")
+ .isEqualTo(0);
+ }
+
+ @Test
+ public void testWithAbfsIteratorDisabledWithoutHasNext() throws Exception {
+ Path testDir = createTestDirectory();
+ setPageSize(10);
+ setEnableAbfsIterator(false);
+ final List fileNames = createFilesUnderDirectory(TEST_FILES_NUMBER,
+ testDir, "testListPath");
+
+ RemoteIterator fsItr =
+ getFileSystem().listStatusIterator(testDir);
+ Assertions.assertThat(fsItr)
+ .describedAs("RemoteIterator should not be instance of "
+ + "AbfsListStatusRemoteIterator when it is disabled")
+ .isNotInstanceOf(AbfsListStatusRemoteIterator.class);
+ int itrCount = 0;
+ for (int i = 0; i < TEST_FILES_NUMBER; i++) {
+ FileStatus fileStatus = fsItr.next();
+ String pathStr = fileStatus.getPath().toString();
+ fileNames.remove(pathStr);
+ itrCount++;
+ }
+ Assertions.assertThatThrownBy(() -> fsItr.next())
+ .describedAs(
+ "next() should throw NoSuchElementException since next has been "
+ + "called " + TEST_FILES_NUMBER + " times")
+ .isInstanceOf(NoSuchElementException.class);
+ Assertions.assertThat(itrCount)
+ .describedAs("Number of iterations should be equal to the files "
+ + "created")
+ .isEqualTo(TEST_FILES_NUMBER);
+ Assertions.assertThat(fileNames.size())
+ .describedAs("After removing every iterm found from the iterator, "
+ + "there should be no more elements in the fileNames")
+ .isEqualTo(0);
+ }
+
+ @Test
+ public void testNextWhenNoMoreElementsPresent() throws Exception {
+ Path testDir = createTestDirectory();
+ setPageSize(10);
+ RemoteIterator fsItr =
+ new AbfsListStatusRemoteIterator(getFileSystem().getFileStatus(testDir),
+ getFileSystem().getAbfsStore());
+ fsItr = Mockito.spy(fsItr);
+ Mockito.doReturn(false).when(fsItr).hasNext();
+
+ RemoteIterator finalFsItr = fsItr;
+ Assertions.assertThatThrownBy(() -> finalFsItr.next())
+ .describedAs(
+ "next() should throw NoSuchElementException if hasNext() return "
+ + "false")
+ .isInstanceOf(NoSuchElementException.class);
+ }
+
+ @Test
+ public void testHasNextForEmptyDir() throws Exception {
+ Path testDir = createTestDirectory();
+ setPageSize(10);
+ RemoteIterator fsItr = getFileSystem()
+ .listStatusIterator(testDir);
+ Assertions.assertThat(fsItr.hasNext())
+ .describedAs("hasNext returns false for empty directory")
+ .isFalse();
+ }
+
+ @Test
+ public void testHasNextForFile() throws Exception {
+ final AzureBlobFileSystem fs = getFileSystem();
+ String testFileName = "testFile";
+ Path testFile = new Path(testFileName);
+ getFileSystem().create(testFile);
+ setPageSize(10);
+ RemoteIterator fsItr = fs.listStatusIterator(testFile);
+ Assertions.assertThat(fsItr.hasNext())
+ .describedAs("hasNext returns true for file").isTrue();
+ Assertions.assertThat(fsItr.next().getPath().toString())
+ .describedAs("next returns the file itself")
+ .endsWith(testFileName);
+ }
+
+ @Test
+ public void testIOException() throws Exception {
+ Path testDir = createTestDirectory();
+ setPageSize(10);
+ getFileSystem().mkdirs(testDir);
+
+ String exceptionMessage = "test exception";
+ ListingSupport lsSupport =getMockListingSupport(exceptionMessage);
+ RemoteIterator fsItr =
+ new AbfsListStatusRemoteIterator(getFileSystem().getFileStatus(testDir),
+ lsSupport);
+
+ Assertions.assertThatThrownBy(() -> fsItr.next())
+ .describedAs(
+ "When ioException is not null and queue is empty exception should be "
+ + "thrown")
+ .isInstanceOf(IOException.class)
+ .hasMessage(exceptionMessage);
+ }
+
+ @Test
+ public void testNonExistingPath() throws Throwable {
+ Path nonExistingDir = new Path("nonExistingPath");
+ Assertions.assertThatThrownBy(
+ () -> getFileSystem().listStatusIterator(nonExistingDir)).describedAs(
+ "test the listStatusIterator call on a path which is not "
+ + "present should result in FileNotFoundException")
+ .isInstanceOf(FileNotFoundException.class);
+ }
+
+ private ListingSupport getMockListingSupport(String exceptionMessage) {
+ return new ListingSupport() {
+ @Override
+ public FileStatus[] listStatus(Path path) throws IOException {
+ return null;
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path path, String startFrom)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public String listStatus(Path path, String startFrom,
+ List fileStatuses, boolean fetchAll, String continuation)
+ throws IOException {
+ throw new IOException(exceptionMessage);
+ }
+ };
+ }
+
+ private Path createTestDirectory() throws IOException {
+ String testDirectoryName = "testDirectory" + System.currentTimeMillis();
+ Path testDirectory = new Path(testDirectoryName);
+ getFileSystem().mkdirs(testDirectory);
+ return testDirectory;
+ }
+
+ private void setEnableAbfsIterator(boolean shouldEnable) throws IOException {
+ AzureBlobFileSystemStore abfsStore = getAbfsStore(getFileSystem());
+ abfsStore.getAbfsConfiguration().setEnableAbfsListIterator(shouldEnable);
+ }
+
+ private void setPageSize(int pageSize) throws IOException {
+ AzureBlobFileSystemStore abfsStore = getAbfsStore(getFileSystem());
+ abfsStore.getAbfsConfiguration().setListMaxResults(pageSize);
+ }
+
+ private List createFilesUnderDirectory(int numFiles, Path rootPath,
+ String filenamePrefix)
+ throws ExecutionException, InterruptedException, IOException {
+ final List> tasks = new ArrayList<>();
+ final List fileNames = new ArrayList<>();
+ ExecutorService es = Executors.newFixedThreadPool(10);
+ try {
+ for (int i = 0; i < numFiles; i++) {
+ final Path filePath = new Path(rootPath, filenamePrefix + i);
+ Callable callable = () -> {
+ getFileSystem().create(filePath);
+ fileNames.add(makeQualified(filePath).toString());
+ return null;
+ };
+ tasks.add(es.submit(callable));
+ }
+ for (Future task : tasks) {
+ task.get();
+ }
+ } finally {
+ es.shutdownNow();
+ }
+ return fileNames;
+ }
+
+}