HADOOP-18546. ABFS. disable purging list of in progress reads in abfs stream close() (#5176)

This addresses HADOOP-18521, "ABFS ReadBufferManager buffer sharing
across concurrent HTTP requests" by not trying to cancel
in progress reads.

It supercedes HADOOP-18528, which disables the prefetching.
If that patch is applied *after* this one, prefetching
will be disabled.

As well as changing the default value in the code,
core-default.xml is updated to set
fs.azure.enable.readahead = true

As a result, if Configuration.get("fs.azure.enable.readahead")
returns a non-null value, then it can be inferred that
it was set in or core-default.xml (the fix is present)
or in core-site.xml (someone asked for it).

Note: this commit contains the followup commit:
That is needed to avoid race conditions in the test.

Contributed by Pranav Saxena.
This commit is contained in:
Pranav Saxena 2022-12-07 12:15:45 -08:00 committed by Steve Loughran
parent 8b748c1cb8
commit 50a0f33cc9
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
7 changed files with 87 additions and 25 deletions

View File

@ -2143,9 +2143,8 @@ The switch to turn S3A auditing on or off.
<property> <property>
<name>fs.azure.enable.readahead</name> <name>fs.azure.enable.readahead</name>
<value>false</value> <value>true</value>
<description>Disable readahead/prefetching in AbfsInputStream. <description>Enabled readahead/prefetching in AbfsInputStream.</description>
See HADOOP-18521</description>
</property> </property>
<property> <property>

View File

@ -109,7 +109,7 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_ABFS_LATENCY_TRACK = false; public static final boolean DEFAULT_ABFS_LATENCY_TRACK = false;
public static final long DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120; public static final long DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120;
public static final boolean DEFAULT_ENABLE_READAHEAD = false; public static final boolean DEFAULT_ENABLE_READAHEAD = true;
public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING; public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING;
public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN"; public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN";

View File

@ -35,7 +35,7 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
private boolean tolerateOobAppends; private boolean tolerateOobAppends;
private boolean isReadAheadEnabled = false; private boolean isReadAheadEnabled = true;
private boolean alwaysReadBufferSize; private boolean alwaysReadBufferSize;

View File

@ -544,7 +544,6 @@ final class ReadBufferManager {
LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream); LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream);
readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream); readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream);
purgeList(stream, completedReadList); purgeList(stream, completedReadList);
purgeList(stream, inProgressList);
} }
/** /**
@ -642,4 +641,9 @@ final class ReadBufferManager {
freeList.clear(); freeList.clear();
completedReadList.add(buf); completedReadList.add(buf);
} }
@VisibleForTesting
int getNumBuffers() {
return NUM_BUFFERS;
}
} }

View File

@ -34,7 +34,6 @@ import org.apache.hadoop.fs.contract.AbstractFSContract;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_AHEAD_RANGE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_AHEAD_RANGE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
@ -69,7 +68,6 @@ public class ITestAbfsFileSystemContractSeek extends AbstractContractSeekTest{
protected AbstractFSContract createContract(final Configuration conf) { protected AbstractFSContract createContract(final Configuration conf) {
conf.setInt(AZURE_READ_AHEAD_RANGE, MIN_BUFFER_SIZE); conf.setInt(AZURE_READ_AHEAD_RANGE, MIN_BUFFER_SIZE);
conf.setInt(AZURE_READ_BUFFER_SIZE, MIN_BUFFER_SIZE); conf.setInt(AZURE_READ_BUFFER_SIZE, MIN_BUFFER_SIZE);
conf.setBoolean(FS_AZURE_ENABLE_READAHEAD, true);
return new AbfsFileSystemContract(conf, isSecure); return new AbfsFileSystemContract(conf, isSecure);
} }

View File

@ -25,6 +25,7 @@ import java.util.Random;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
@ -74,17 +75,14 @@ public class ITestReadBufferManager extends AbstractAbfsIntegrationTest {
} }
} finally { } finally {
executorService.shutdown(); executorService.shutdown();
// wait for all tasks to finish
executorService.awaitTermination(1, TimeUnit.MINUTES);
} }
ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
assertListEmpty("CompletedList", bufferManager.getCompletedReadListCopy()); // verify there is no work in progress or the readahead queue.
assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList()); assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList());
assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy()); assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());
Assertions.assertThat(bufferManager.getFreeListCopy())
.describedAs("After closing all streams free list contents should match with " + freeList)
.hasSize(numBuffers)
.containsExactlyInAnyOrderElementsOf(freeList);
} }
private void assertListEmpty(String listName, List<ReadBuffer> list) { private void assertListEmpty(String listName, List<ReadBuffer> list) {
@ -116,22 +114,18 @@ public class ITestReadBufferManager extends AbstractAbfsIntegrationTest {
try { try {
iStream2 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream(); iStream2 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream();
iStream2.read(); iStream2.read();
// After closing stream1, none of the buffers associated with stream1 should be present. // After closing stream1, no queued buffers of stream1 should be present
assertListDoesnotContainBuffersForIstream(bufferManager.getInProgressCopiedList(), iStream1); // assertions can't be made about the state of the other lists as it is
assertListDoesnotContainBuffersForIstream(bufferManager.getCompletedReadListCopy(), iStream1); // too prone to race conditions.
assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream1); assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream1);
} finally { } finally {
// closing the stream later. // closing the stream later.
IOUtils.closeStream(iStream2); IOUtils.closeStream(iStream2);
} }
// After closing stream2, none of the buffers associated with stream2 should be present. // After closing stream2, no queued buffers of stream2 should be present.
assertListDoesnotContainBuffersForIstream(bufferManager.getInProgressCopiedList(), iStream2);
assertListDoesnotContainBuffersForIstream(bufferManager.getCompletedReadListCopy(), iStream2);
assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream2); assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream2);
// After closing both the streams, all lists should be empty. // After closing both the streams, read queue should be empty.
assertListEmpty("CompletedList", bufferManager.getCompletedReadListCopy());
assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList());
assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy()); assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());
} }

View File

@ -82,6 +82,12 @@ public class TestAbfsInputStream extends
REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec
private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 * ONE_MB; private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 * ONE_MB;
@Override
public void teardown() throws Exception {
super.teardown();
ReadBufferManager.getBufferManager().testResetReadBufferManager();
}
private AbfsRestOperation getMockRestOp() { private AbfsRestOperation getMockRestOp() {
AbfsRestOperation op = mock(AbfsRestOperation.class); AbfsRestOperation op = mock(AbfsRestOperation.class);
AbfsHttpOperation httpOp = mock(AbfsHttpOperation.class); AbfsHttpOperation httpOp = mock(AbfsHttpOperation.class);
@ -106,7 +112,6 @@ public class TestAbfsInputStream extends
private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient, private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient,
String fileName) throws IOException { String fileName) throws IOException {
AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1); AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1);
inputStreamContext.isReadAheadEnabled(true);
// Create AbfsInputStream with the client instance // Create AbfsInputStream with the client instance
AbfsInputStream inputStream = new AbfsInputStream( AbfsInputStream inputStream = new AbfsInputStream(
mockAbfsClient, mockAbfsClient,
@ -132,7 +137,6 @@ public class TestAbfsInputStream extends
boolean alwaysReadBufferSize, boolean alwaysReadBufferSize,
int readAheadBlockSize) throws IOException { int readAheadBlockSize) throws IOException {
AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1); AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1);
inputStreamContext.isReadAheadEnabled(true);
// Create AbfsInputStream with the client instance // Create AbfsInputStream with the client instance
AbfsInputStream inputStream = new AbfsInputStream( AbfsInputStream inputStream = new AbfsInputStream(
abfsClient, abfsClient,
@ -495,6 +499,69 @@ public class TestAbfsInputStream extends
checkEvictedStatus(inputStream, 0, true); checkEvictedStatus(inputStream, 0, true);
} }
/**
* This test expects InProgressList is not purged by the inputStream close.
*/
@Test
public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
AbfsClient client = getMockAbfsClient();
AbfsRestOperation successOp = getMockRestOp();
final Long serverCommunicationMockLatency = 3_000L;
final Long readBufferTransferToInProgressProbableTime = 1_000L;
final Integer readBufferQueuedCount = 3;
Mockito.doAnswer(invocationOnMock -> {
//sleeping thread to mock the network latency from client to backend.
Thread.sleep(serverCommunicationMockLatency);
return successOp;
})
.when(client)
.read(any(String.class), any(Long.class), any(byte[].class),
any(Integer.class), any(Integer.class), any(String.class),
any(String.class), any(TracingContext.class));
final ReadBufferManager readBufferManager
= ReadBufferManager.getBufferManager();
final int readBufferTotal = readBufferManager.getNumBuffers();
final int expectedFreeListBufferCount = readBufferTotal
- readBufferQueuedCount;
try (AbfsInputStream inputStream = getAbfsInputStream(client,
"testSuccessfulReadAhead.txt")) {
// As this is try-with-resources block, the close() method of the created
// abfsInputStream object shall be called on the end of the block.
queueReadAheads(inputStream);
//Sleeping to give ReadBufferWorker to pick the readBuffers for processing.
Thread.sleep(readBufferTransferToInProgressProbableTime);
Assertions.assertThat(readBufferManager.getInProgressCopiedList())
.describedAs(String.format("InProgressList should have %d elements",
readBufferQueuedCount))
.hasSize(readBufferQueuedCount);
Assertions.assertThat(readBufferManager.getFreeListCopy())
.describedAs(String.format("FreeList should have %d elements",
expectedFreeListBufferCount))
.hasSize(expectedFreeListBufferCount);
Assertions.assertThat(readBufferManager.getCompletedReadListCopy())
.describedAs("CompletedList should have 0 elements")
.hasSize(0);
}
Assertions.assertThat(readBufferManager.getInProgressCopiedList())
.describedAs(String.format("InProgressList should have %d elements",
readBufferQueuedCount))
.hasSize(readBufferQueuedCount);
Assertions.assertThat(readBufferManager.getFreeListCopy())
.describedAs(String.format("FreeList should have %d elements",
expectedFreeListBufferCount))
.hasSize(expectedFreeListBufferCount);
Assertions.assertThat(readBufferManager.getCompletedReadListCopy())
.describedAs("CompletedList should have 0 elements")
.hasSize(0);
}
/** /**
* This test expects ReadAheadManager to throw exception if the read ahead * This test expects ReadAheadManager to throw exception if the read ahead
* thread had failed within the last thresholdAgeMilliseconds. * thread had failed within the last thresholdAgeMilliseconds.