diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 750306c4a98..86bd904f1e8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -1185,7 +1185,7 @@ public class AzureBlobFileSystem extends FileSystem TracingContext tracingContext = new TracingContext(clientCorrelationId, fileSystemId, FSOperationType.LISTSTATUS, true, tracingHeaderFormat, listener); AbfsListStatusRemoteIterator abfsLsItr = - new AbfsListStatusRemoteIterator(getFileStatus(path, tracingContext), abfsStore, + new AbfsListStatusRemoteIterator(path, abfsStore, tracingContext); return RemoteIterators.typeCastingRemoteIterator(abfsLsItr); } else { @@ -1360,9 +1360,9 @@ public class AzureBlobFileSystem extends FileSystem * @throws IOException if the exception error code is not on the allowed list. */ @VisibleForTesting - static void checkException(final Path path, - final AzureBlobFileSystemException exception, - final AzureServiceErrorCode... allowedErrorCodesList) throws IOException { + public static void checkException(final Path path, + final AzureBlobFileSystemException exception, + final AzureServiceErrorCode... allowedErrorCodesList) throws IOException { if (exception instanceof AbfsRestOperationException) { AbfsRestOperationException ere = (AbfsRestOperationException) exception; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java index ce6207bf5f2..3fecb1f0591 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java @@ -32,7 +32,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; public class AbfsListStatusRemoteIterator @@ -45,7 +48,7 @@ public class AbfsListStatusRemoteIterator private static final int MAX_QUEUE_SIZE = 10; private static final long POLL_WAIT_TIME_IN_MS = 250; - private final FileStatus fileStatus; + private final Path path; private final ListingSupport listingSupport; private final ArrayBlockingQueue listResultQueue; private final TracingContext tracingContext; @@ -55,13 +58,15 @@ public class AbfsListStatusRemoteIterator private String continuation; private Iterator currIterator; - public AbfsListStatusRemoteIterator(final FileStatus fileStatus, - final ListingSupport listingSupport, TracingContext tracingContext) { - this.fileStatus = fileStatus; + public AbfsListStatusRemoteIterator(final Path path, + final ListingSupport listingSupport, TracingContext tracingContext) + throws IOException { + this.path = path; this.listingSupport = listingSupport; this.tracingContext = tracingContext; listResultQueue = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE); currIterator = Collections.emptyIterator(); + addNextBatchIteratorToQueue(); fetchBatchesAsync(); } @@ -130,9 +135,6 @@ public class AbfsListStatusRemoteIterator Thread.currentThread().interrupt(); LOG.error("Thread got interrupted: {}", interruptedException); } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.error("Thread got interrupted: {}", e); } finally { synchronized (this) { isAsyncInProgress = false; @@ -141,13 +143,21 @@ public class AbfsListStatusRemoteIterator } private synchronized void addNextBatchIteratorToQueue() - throws IOException, InterruptedException { + throws IOException { List fileStatuses = new ArrayList<>(); - continuation = listingSupport - .listStatus(fileStatus.getPath(), null, fileStatuses, FETCH_ALL_FALSE, - continuation, tracingContext); - if (!fileStatuses.isEmpty()) { - listResultQueue.put(new AbfsListResult(fileStatuses.iterator())); + try { + try { + continuation = listingSupport.listStatus(path, null, fileStatuses, + FETCH_ALL_FALSE, continuation, tracingContext); + } catch (AbfsRestOperationException ex) { + AzureBlobFileSystem.checkException(path, ex); + } + if (!fileStatuses.isEmpty()) { + listResultQueue.put(new AbfsListResult(fileStatuses.iterator())); + } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOG.error("Thread interrupted", ie); } if (continuation == null || continuation.isEmpty()) { isIterationComplete = true; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java index 3f50aec6591..ea1d0e26fac 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java @@ -68,10 +68,9 @@ public class ITestAbfsListStatusRemoteIterator extends AbstractAbfsIntegrationTe setPageSize(10); final List fileNames = createFilesUnderDirectory(testDir); - ListingSupport listngSupport = Mockito.spy(getFileSystem().getAbfsStore()); - RemoteIterator fsItr = new AbfsListStatusRemoteIterator( - getFileSystem().getFileStatus(testDir), listngSupport, - getTestTracingContext(getFileSystem(), true)); + ListingSupport listingSupport = Mockito.spy(getFileSystem().getAbfsStore()); + RemoteIterator fsItr = new AbfsListStatusRemoteIterator(testDir, + listingSupport, getTestTracingContext(getFileSystem(), true)); Assertions.assertThat(fsItr) .describedAs("RemoteIterator should be instance of " + "AbfsListStatusRemoteIterator by default") @@ -84,7 +83,7 @@ public class ITestAbfsListStatusRemoteIterator extends AbstractAbfsIntegrationTe } verifyIteratorResultCount(itrCount, fileNames); int minNumberOfInvocations = TEST_FILES_NUMBER / 10; - verify(listngSupport, Mockito.atLeast(minNumberOfInvocations)) + verify(listingSupport, Mockito.atLeast(minNumberOfInvocations)) .listStatus(any(Path.class), nullable(String.class), anyList(), anyBoolean(), nullable(String.class), @@ -97,10 +96,9 @@ public class ITestAbfsListStatusRemoteIterator extends AbstractAbfsIntegrationTe setPageSize(10); final List fileNames = createFilesUnderDirectory(testDir); - ListingSupport listngSupport = Mockito.spy(getFileSystem().getAbfsStore()); - RemoteIterator fsItr = new AbfsListStatusRemoteIterator( - getFileSystem().getFileStatus(testDir), listngSupport, - getTestTracingContext(getFileSystem(), true)); + ListingSupport listingSupport = Mockito.spy(getFileSystem().getAbfsStore()); + RemoteIterator fsItr = new AbfsListStatusRemoteIterator(testDir, + listingSupport, getTestTracingContext(getFileSystem(), true)); Assertions.assertThat(fsItr) .describedAs("RemoteIterator should be instance of " + "AbfsListStatusRemoteIterator by default") @@ -114,7 +112,7 @@ public class ITestAbfsListStatusRemoteIterator extends AbstractAbfsIntegrationTe LambdaTestUtils.intercept(NoSuchElementException.class, fsItr::next); verifyIteratorResultCount(itrCount, fileNames); int minNumberOfInvocations = TEST_FILES_NUMBER / 10; - verify(listngSupport, Mockito.atLeast(minNumberOfInvocations)) + verify(listingSupport, Mockito.atLeast(minNumberOfInvocations)) .listStatus(any(Path.class), nullable(String.class), anyList(), anyBoolean(), nullable(String.class), @@ -169,10 +167,9 @@ public class ITestAbfsListStatusRemoteIterator extends AbstractAbfsIntegrationTe public void testNextWhenNoMoreElementsPresent() throws Exception { Path testDir = createTestDirectory(); setPageSize(10); - RemoteIterator fsItr = - new AbfsListStatusRemoteIterator(getFileSystem().getFileStatus(testDir), - getFileSystem().getAbfsStore(), - getTestTracingContext(getFileSystem(), true)); + RemoteIterator fsItr = new AbfsListStatusRemoteIterator(testDir, + getFileSystem().getAbfsStore(), + getTestTracingContext(getFileSystem(), true)); fsItr = Mockito.spy(fsItr); Mockito.doReturn(false).when(fsItr).hasNext(); @@ -212,12 +209,11 @@ public class ITestAbfsListStatusRemoteIterator extends AbstractAbfsIntegrationTe getFileSystem().mkdirs(testDir); String exceptionMessage = "test exception"; - ListingSupport lsSupport =getMockListingSupport(exceptionMessage); - RemoteIterator fsItr = - new AbfsListStatusRemoteIterator(getFileSystem().getFileStatus(testDir), - lsSupport, getTestTracingContext(getFileSystem(), true)); + ListingSupport lsSupport = getMockListingSupport(exceptionMessage); - LambdaTestUtils.intercept(IOException.class, fsItr::next); + LambdaTestUtils.intercept(IOException.class, + () -> new AbfsListStatusRemoteIterator(testDir, lsSupport, + getTestTracingContext(getFileSystem(), true))); } @Test diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsRestOperationException.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsRestOperationException.java index a71e7bc815f..ce9415a8179 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsRestOperationException.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsRestOperationException.java @@ -74,8 +74,9 @@ public class ITestAbfsRestOperationException extends AbstractAbfsIntegrationTest // verify its format String errorMessage = ex.getLocalizedMessage(); String[] errorFields = errorMessage.split(","); - - Assert.assertEquals(6, errorFields.length); + Assertions.assertThat(errorFields) + .describedAs("fields in exception of %s", ex) + .hasSize(6); // Check status message, status code, HTTP Request Type and URL. Assert.assertEquals("Operation failed: \"The specified path does not exist.\"", errorFields[0].trim()); Assert.assertEquals("404", errorFields[1].trim());