diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 960579dfaa3..95936795227 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -19,13 +19,16 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.EOFException; +import java.io.FileNotFoundException; import java.io.IOException; +import java.net.HttpURLConnection; import com.google.common.base.Preconditions; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem.Statistics; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; /** @@ -225,6 +228,12 @@ public class AbfsInputStream extends FSInputStream { try { op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag); } catch (AzureBlobFileSystemException ex) { + if (ex instanceof AbfsRestOperationException) { + AbfsRestOperationException ere = (AbfsRestOperationException) ex; + if (ere.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { + throw new FileNotFoundException(ere.getMessage()); + } + } throw new IOException(ex); } long bytesRead = op.getResult().getBytesReceived(); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 7e43090a957..85db7740c55 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -18,9 +18,11 @@ package org.apache.hadoop.fs.azurebfs.services; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; +import java.net.HttpURLConnection; import java.util.Locale; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.LinkedBlockingQueue; @@ -33,10 +35,11 @@ import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.Syncable; -import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; /** * The BlobFsOutputStream for Rest AbfsClient. @@ -290,6 +293,12 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa try { writeOperation.task.get(); } catch (Exception ex) { + if (ex.getCause() instanceof AbfsRestOperationException) { + if (((AbfsRestOperationException) ex.getCause()).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { + throw new FileNotFoundException(ex.getMessage()); + } + } + if (ex.getCause() instanceof AzureBlobFileSystemException) { ex = (AzureBlobFileSystemException) ex.getCause(); } @@ -313,6 +322,11 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa try { client.flush(path, offset, retainUncommitedData); } catch (AzureBlobFileSystemException ex) { + if (ex instanceof AbfsRestOperationException) { + if (((AbfsRestOperationException) ex).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { + throw new FileNotFoundException(ex.getMessage()); + } + } throw new IOException(ex); } this.lastFlushOffset = offset; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java index 2e994911acc..9e22790cd08 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.azurebfs; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.Arrays; import java.util.Random; @@ -30,9 +31,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertArrayEquals; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** * Test end to end between ABFS client and ABFS server. @@ -136,6 +135,52 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest { inputStream.close(); } + @Test + public void testReadWithFileNotFoundException() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + final Path testFilePath = new Path(methodName.getMethodName()); + testWriteOneByteToFile(testFilePath); + + FSDataInputStream inputStream = fs.open(testFilePath, TEST_DEFAULT_BUFFER_SIZE); + fs.delete(testFilePath, true); + assertFalse(fs.exists(testFilePath)); + + intercept(FileNotFoundException.class, + () -> inputStream.read(new byte[1])); + } + + @Test + public void testWriteWithFileNotFoundException() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + final Path testFilePath = new Path(methodName.getMethodName()); + + FSDataOutputStream stream = fs.create(testFilePath); + assertTrue(fs.exists(testFilePath)); + stream.write(TEST_BYTE); + + fs.delete(testFilePath, true); + assertFalse(fs.exists(testFilePath)); + + // trigger append call + intercept(FileNotFoundException.class, + () -> stream.close()); + } + + @Test + public void testFlushWithFileNotFoundException() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + final Path testFilePath = new Path(methodName.getMethodName()); + + FSDataOutputStream stream = fs.create(testFilePath); + assertTrue(fs.exists(testFilePath)); + + fs.delete(testFilePath, true); + assertFalse(fs.exists(testFilePath)); + + intercept(FileNotFoundException.class, + () -> stream.close()); + } + private void testWriteOneByteToFile(Path testFilePath) throws Exception { final AzureBlobFileSystem fs = getFileSystem(); try(FSDataOutputStream stream = fs.create(testFilePath)) {