diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index b2bd64f3c6c..222285fa23f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -374,7 +374,8 @@ public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statist // Add statistics for InputStream return new AbfsInputStream(client, statistics, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength, - abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(), eTag); + abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(), + abfsConfiguration.getTolerateOobAppends(), eTag); } public OutputStream openFileForWrite(final Path path, final boolean overwrite) throws diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 95936795227..fe48cb93237 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -61,6 +61,7 @@ public AbfsInputStream( final long contentLength, final int bufferSize, final int readAheadQueueDepth, + final boolean tolerateOobAppends, final String eTag) { this.client = client; this.statistics = statistics; @@ -68,8 +69,8 @@ public AbfsInputStream( this.contentLength = contentLength; this.bufferSize = bufferSize; this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors(); + this.tolerateOobAppends = tolerateOobAppends; this.eTag = eTag; - this.tolerateOobAppends = false; this.readAheadEnabled = true; } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java index 9e22790cd08..ebc9c07e53e 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java @@ -25,12 +25,14 @@ import org.junit.Test; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_TOLERATE_CONCURRENT_APPEND; import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** @@ -66,7 +68,9 @@ public void testReadWriteBytesToFile() throws Exception { } @Test (expected = IOException.class) - public void testOOBWrites() throws Exception { + public void testOOBWritesAndReadFail() throws Exception { + Configuration conf = this.getRawConfiguration(); + conf.setBoolean(AZURE_TOLERATE_CONCURRENT_APPEND, false); final AzureBlobFileSystem fs = getFileSystem(); int readBufferSize = fs.getAbfsStore().getAbfsConfiguration().getReadBufferSize(); @@ -83,7 +87,6 @@ public void testOOBWrites() throws Exception { try (FSDataInputStream readStream = fs.open(testFilePath)) { assertEquals(readBufferSize, readStream.read(bytesToRead, 0, readBufferSize)); - try (FSDataOutputStream writeStream = fs.create(testFilePath)) { writeStream.write(b); writeStream.flush(); @@ -94,6 +97,36 @@ public void testOOBWrites() throws Exception { } } + @Test + public void testOOBWritesAndReadSucceed() throws Exception { + Configuration conf = this.getRawConfiguration(); + conf.setBoolean(AZURE_TOLERATE_CONCURRENT_APPEND, true); + final AzureBlobFileSystem fs = getFileSystem(conf); + int readBufferSize = fs.getAbfsStore().getAbfsConfiguration().getReadBufferSize(); + + byte[] bytesToRead = new byte[readBufferSize]; + final byte[] b = new byte[2 * readBufferSize]; + new Random().nextBytes(b); + final Path testFilePath = new Path(methodName.getMethodName()); + + try (FSDataOutputStream writeStream = fs.create(testFilePath)) { + writeStream.write(b); + writeStream.flush(); + } + + try (FSDataInputStream readStream = fs.open(testFilePath)) { + // Read + assertEquals(readBufferSize, readStream.read(bytesToRead, 0, readBufferSize)); + // Concurrent write + try (FSDataOutputStream writeStream = fs.create(testFilePath)) { + writeStream.write(b); + writeStream.flush(); + } + + assertEquals(readBufferSize, readStream.read(bytesToRead, 0, readBufferSize)); + } + } + @Test public void testWriteWithBufferOffset() throws Exception { final AzureBlobFileSystem fs = getFileSystem();