diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 9e6bdc24332..9f71f27c86a 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -2143,9 +2143,8 @@ The switch to turn S3A auditing on or off.
fs.azure.enable.readahead
- false
- Disable readahead/prefetching in AbfsInputStream.
- See HADOOP-18521
+ true
+ Enabled readahead/prefetching in AbfsInputStream.
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index 0ea2c929800..9994d9f5207 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -109,7 +109,7 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_ABFS_LATENCY_TRACK = false;
public static final long DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120;
- public static final boolean DEFAULT_ENABLE_READAHEAD = false;
+ public static final boolean DEFAULT_ENABLE_READAHEAD = true;
public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING;
public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN";
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
index b479c22fcee..05afc7b9858 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
@@ -35,7 +35,7 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
private boolean tolerateOobAppends;
- private boolean isReadAheadEnabled = false;
+ private boolean isReadAheadEnabled = true;
private boolean alwaysReadBufferSize;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
index 4eefb9fdf2c..62d050d0fc3 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
@@ -544,7 +544,6 @@ final class ReadBufferManager {
LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream);
readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream);
purgeList(stream, completedReadList);
- purgeList(stream, inProgressList);
}
/**
@@ -642,4 +641,9 @@ final class ReadBufferManager {
freeList.clear();
completedReadList.add(buf);
}
+
+ @VisibleForTesting
+ int getNumBuffers() {
+ return NUM_BUFFERS;
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java
index aaf47f7a9c8..f7fe5039799 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.fs.contract.AbstractFSContract;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_AHEAD_RANGE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_BUFFER_SIZE;
-import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
@@ -69,7 +68,6 @@ public class ITestAbfsFileSystemContractSeek extends AbstractContractSeekTest{
protected AbstractFSContract createContract(final Configuration conf) {
conf.setInt(AZURE_READ_AHEAD_RANGE, MIN_BUFFER_SIZE);
conf.setInt(AZURE_READ_BUFFER_SIZE, MIN_BUFFER_SIZE);
- conf.setBoolean(FS_AZURE_ENABLE_READAHEAD, true);
return new AbfsFileSystemContract(conf, isSecure);
}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
index 705cc2530d3..eca670fba90 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
@@ -25,6 +25,7 @@ import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -74,17 +75,14 @@ public class ITestReadBufferManager extends AbstractAbfsIntegrationTest {
}
} finally {
executorService.shutdown();
+ // wait for all tasks to finish
+ executorService.awaitTermination(1, TimeUnit.MINUTES);
}
ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
- assertListEmpty("CompletedList", bufferManager.getCompletedReadListCopy());
+ // verify there is no work in progress or the readahead queue.
assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList());
assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());
- Assertions.assertThat(bufferManager.getFreeListCopy())
- .describedAs("After closing all streams free list contents should match with " + freeList)
- .hasSize(numBuffers)
- .containsExactlyInAnyOrderElementsOf(freeList);
-
}
private void assertListEmpty(String listName, List list) {
@@ -116,22 +114,18 @@ public class ITestReadBufferManager extends AbstractAbfsIntegrationTest {
try {
iStream2 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream();
iStream2.read();
- // After closing stream1, none of the buffers associated with stream1 should be present.
- assertListDoesnotContainBuffersForIstream(bufferManager.getInProgressCopiedList(), iStream1);
- assertListDoesnotContainBuffersForIstream(bufferManager.getCompletedReadListCopy(), iStream1);
+ // After closing stream1, no queued buffers of stream1 should be present
+ // assertions can't be made about the state of the other lists as it is
+ // too prone to race conditions.
assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream1);
} finally {
// closing the stream later.
IOUtils.closeStream(iStream2);
}
- // After closing stream2, none of the buffers associated with stream2 should be present.
- assertListDoesnotContainBuffersForIstream(bufferManager.getInProgressCopiedList(), iStream2);
- assertListDoesnotContainBuffersForIstream(bufferManager.getCompletedReadListCopy(), iStream2);
+ // After closing stream2, no queued buffers of stream2 should be present.
assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream2);
- // After closing both the streams, all lists should be empty.
- assertListEmpty("CompletedList", bufferManager.getCompletedReadListCopy());
- assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList());
+ // After closing both the streams, read queue should be empty.
assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());
}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
index 69795ee5bd8..0395c4183b9 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
@@ -82,6 +82,12 @@ public class TestAbfsInputStream extends
REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec
private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 * ONE_MB;
+ @Override
+ public void teardown() throws Exception {
+ super.teardown();
+ ReadBufferManager.getBufferManager().testResetReadBufferManager();
+ }
+
private AbfsRestOperation getMockRestOp() {
AbfsRestOperation op = mock(AbfsRestOperation.class);
AbfsHttpOperation httpOp = mock(AbfsHttpOperation.class);
@@ -106,7 +112,6 @@ public class TestAbfsInputStream extends
private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient,
String fileName) throws IOException {
AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1);
- inputStreamContext.isReadAheadEnabled(true);
// Create AbfsInputStream with the client instance
AbfsInputStream inputStream = new AbfsInputStream(
mockAbfsClient,
@@ -132,7 +137,6 @@ public class TestAbfsInputStream extends
boolean alwaysReadBufferSize,
int readAheadBlockSize) throws IOException {
AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1);
- inputStreamContext.isReadAheadEnabled(true);
// Create AbfsInputStream with the client instance
AbfsInputStream inputStream = new AbfsInputStream(
abfsClient,
@@ -495,6 +499,69 @@ public class TestAbfsInputStream extends
checkEvictedStatus(inputStream, 0, true);
}
+ /**
+ * This test expects InProgressList is not purged by the inputStream close.
+ */
+ @Test
+ public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
+ AbfsClient client = getMockAbfsClient();
+ AbfsRestOperation successOp = getMockRestOp();
+ final Long serverCommunicationMockLatency = 3_000L;
+ final Long readBufferTransferToInProgressProbableTime = 1_000L;
+ final Integer readBufferQueuedCount = 3;
+
+ Mockito.doAnswer(invocationOnMock -> {
+ //sleeping thread to mock the network latency from client to backend.
+ Thread.sleep(serverCommunicationMockLatency);
+ return successOp;
+ })
+ .when(client)
+ .read(any(String.class), any(Long.class), any(byte[].class),
+ any(Integer.class), any(Integer.class), any(String.class),
+ any(String.class), any(TracingContext.class));
+
+ final ReadBufferManager readBufferManager
+ = ReadBufferManager.getBufferManager();
+
+ final int readBufferTotal = readBufferManager.getNumBuffers();
+ final int expectedFreeListBufferCount = readBufferTotal
+ - readBufferQueuedCount;
+
+ try (AbfsInputStream inputStream = getAbfsInputStream(client,
+ "testSuccessfulReadAhead.txt")) {
+ // As this is try-with-resources block, the close() method of the created
+ // abfsInputStream object shall be called on the end of the block.
+ queueReadAheads(inputStream);
+
+ //Sleeping to give ReadBufferWorker to pick the readBuffers for processing.
+ Thread.sleep(readBufferTransferToInProgressProbableTime);
+
+ Assertions.assertThat(readBufferManager.getInProgressCopiedList())
+ .describedAs(String.format("InProgressList should have %d elements",
+ readBufferQueuedCount))
+ .hasSize(readBufferQueuedCount);
+ Assertions.assertThat(readBufferManager.getFreeListCopy())
+ .describedAs(String.format("FreeList should have %d elements",
+ expectedFreeListBufferCount))
+ .hasSize(expectedFreeListBufferCount);
+ Assertions.assertThat(readBufferManager.getCompletedReadListCopy())
+ .describedAs("CompletedList should have 0 elements")
+ .hasSize(0);
+ }
+
+ Assertions.assertThat(readBufferManager.getInProgressCopiedList())
+ .describedAs(String.format("InProgressList should have %d elements",
+ readBufferQueuedCount))
+ .hasSize(readBufferQueuedCount);
+ Assertions.assertThat(readBufferManager.getFreeListCopy())
+ .describedAs(String.format("FreeList should have %d elements",
+ expectedFreeListBufferCount))
+ .hasSize(expectedFreeListBufferCount);
+ Assertions.assertThat(readBufferManager.getCompletedReadListCopy())
+ .describedAs("CompletedList should have 0 elements")
+ .hasSize(0);
+ }
+
/**
* This test expects ReadAheadManager to throw exception if the read ahead
* thread had failed within the last thresholdAgeMilliseconds.