HADOOP-17301. ABFS: read-ahead error reporting breaks buffer management (#2369)

Fixes read-ahead buffer management issues introduced by HADOOP-16852,
 "ABFS: Send error back to client for Read Ahead request failure".

Contributed by Sneha Vijayarajan
This commit is contained in:
Sneha Vijayarajan 2020-10-13 21:00:34 +05:30 committed by Thomas Marquardt
parent da5db6a5a6
commit d5b4d04b0d
No known key found for this signature in database
GPG Key ID: AEB30C9E78868287
2 changed files with 82 additions and 3 deletions

View File

@ -22,6 +22,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Queue; import java.util.Queue;
@ -218,6 +219,8 @@ final class ReadBufferManager {
return false; // there are no evict-able buffers return false; // there are no evict-able buffers
} }
long currentTimeInMs = currentTimeMillis();
// first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed) // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed)
for (ReadBuffer buf : completedReadList) { for (ReadBuffer buf : completedReadList) {
if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) { if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) {
@ -242,14 +245,30 @@ final class ReadBufferManager {
} }
// next, try any old nodes that have not been consumed // next, try any old nodes that have not been consumed
// Failed read buffers (with buffer index=-1) that are older than
// thresholdAge should be cleaned up, but at the same time should not
// report successful eviction.
// Queue logic expects that a buffer is freed up for read ahead when
// eviction is successful, whereas a failed ReadBuffer would have released
// its buffer when its status was set to READ_FAILED.
long earliestBirthday = Long.MAX_VALUE; long earliestBirthday = Long.MAX_VALUE;
ArrayList<ReadBuffer> oldFailedBuffers = new ArrayList<>();
for (ReadBuffer buf : completedReadList) { for (ReadBuffer buf : completedReadList) {
if (buf.getTimeStamp() < earliestBirthday) { if ((buf.getBufferindex() != -1)
&& (buf.getTimeStamp() < earliestBirthday)) {
nodeToEvict = buf; nodeToEvict = buf;
earliestBirthday = buf.getTimeStamp(); earliestBirthday = buf.getTimeStamp();
} else if ((buf.getBufferindex() == -1)
&& (currentTimeInMs - buf.getTimeStamp()) > thresholdAgeMilliseconds) {
oldFailedBuffers.add(buf);
} }
} }
if ((currentTimeMillis() - earliestBirthday > thresholdAgeMilliseconds) && (nodeToEvict != null)) {
for (ReadBuffer buf : oldFailedBuffers) {
evict(buf);
}
if ((currentTimeInMs - earliestBirthday > thresholdAgeMilliseconds) && (nodeToEvict != null)) {
return evict(nodeToEvict); return evict(nodeToEvict);
} }
@ -417,7 +436,6 @@ final class ReadBufferManager {
if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
buffer.setStatus(ReadBufferStatus.AVAILABLE); buffer.setStatus(ReadBufferStatus.AVAILABLE);
buffer.setLength(bytesActuallyRead); buffer.setLength(bytesActuallyRead);
completedReadList.add(buffer);
} else { } else {
freeList.push(buffer.getBufferindex()); freeList.push(buffer.getBufferindex());
// buffer will be deleted as per the eviction policy. // buffer will be deleted as per the eviction policy.
@ -464,4 +482,16 @@ final class ReadBufferManager {
void callTryEvict() { void callTryEvict() {
tryEvict(); tryEvict();
} }
/**
* Test method that can mimic no free buffers scenario and also add a ReadBuffer
* into completedReadList. This readBuffer will get picked up by TryEvict()
* next time a new queue request comes in.
* @param buf that needs to be added to completedReadlist
*/
@VisibleForTesting
void testMimicFullUseAndAddFailedBuffer(ReadBuffer buf) {
freeList.clear();
completedReadList.add(buf);
}
} }

View File

@ -23,9 +23,12 @@ import java.io.IOException;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.assertj.core.api.Assertions;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
import org.apache.hadoop.fs.azurebfs.utils.TestCachedSASToken; import org.apache.hadoop.fs.azurebfs.utils.TestCachedSASToken;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
@ -49,6 +52,8 @@ public class TestAbfsInputStream extends
private static final int TWO_KB = 2 * 1024; private static final int TWO_KB = 2 * 1024;
private static final int THREE_KB = 3 * 1024; private static final int THREE_KB = 3 * 1024;
private static final int REDUCED_READ_BUFFER_AGE_THRESHOLD = 3000; // 3 sec private static final int REDUCED_READ_BUFFER_AGE_THRESHOLD = 3000; // 3 sec
private static final int INCREASED_READ_BUFFER_AGE_THRESHOLD =
REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec
private AbfsRestOperation getMockRestOp() { private AbfsRestOperation getMockRestOp() {
AbfsRestOperation op = mock(AbfsRestOperation.class); AbfsRestOperation op = mock(AbfsRestOperation.class);
@ -182,7 +187,38 @@ public class TestAbfsInputStream extends
checkEvictedStatus(inputStream, 0, false); checkEvictedStatus(inputStream, 0, false);
} }
@Test
public void testFailedReadAheadEviction() throws Exception {
AbfsClient client = getMockAbfsClient();
AbfsRestOperation successOp = getMockRestOp();
ReadBufferManager.setThresholdAgeMilliseconds(INCREASED_READ_BUFFER_AGE_THRESHOLD);
// Stub :
// Read request leads to 3 readahead calls: Fail all 3 readahead-client.read()
// Actual read request fails with the failure in readahead thread
doThrow(new TimeoutException("Internal Server error"))
.when(client)
.read(any(String.class), any(Long.class), any(byte[].class),
any(Integer.class), any(Integer.class), any(String.class),
any(String.class));
AbfsInputStream inputStream = getAbfsInputStream(client, "testFailedReadAheadEviction.txt");
// Add a failed buffer to completed queue and set to no free buffers to read ahead.
ReadBuffer buff = new ReadBuffer();
buff.setStatus(ReadBufferStatus.READ_FAILED);
ReadBufferManager.getBufferManager().testMimicFullUseAndAddFailedBuffer(buff);
// if read failed buffer eviction is tagged as a valid eviction, it will lead to
// wrong assumption of queue logic that a buffer is freed up and can lead to :
// java.util.EmptyStackException
// at java.util.Stack.peek(Stack.java:102)
// at java.util.Stack.pop(Stack.java:84)
// at org.apache.hadoop.fs.azurebfs.services.ReadBufferManager.queueReadAhead
ReadBufferManager.getBufferManager().queueReadAhead(inputStream, 0, ONE_KB);
}
/** /**
*
* The test expects AbfsInputStream to initiate a remote read request for * The test expects AbfsInputStream to initiate a remote read request for
* the request offset and length when previous read ahead on the offset had failed. * the request offset and length when previous read ahead on the offset had failed.
* Also checks that the ReadBuffers are evicted as per the ReadBufferManager * Also checks that the ReadBuffers are evicted as per the ReadBufferManager
@ -264,12 +300,25 @@ public class TestAbfsInputStream extends
any(String.class)); any(String.class));
AbfsInputStream inputStream = getAbfsInputStream(client, "testSuccessfulReadAhead.txt"); AbfsInputStream inputStream = getAbfsInputStream(client, "testSuccessfulReadAhead.txt");
int beforeReadCompletedListSize = ReadBufferManager.getBufferManager().getCompletedReadListSize();
// First read request that triggers readAheads. // First read request that triggers readAheads.
inputStream.read(new byte[ONE_KB]); inputStream.read(new byte[ONE_KB]);
// Only the 3 readAhead threads should have triggered client.read // Only the 3 readAhead threads should have triggered client.read
verifyReadCallCount(client, 3); verifyReadCallCount(client, 3);
int newAdditionsToCompletedRead =
ReadBufferManager.getBufferManager().getCompletedReadListSize()
- beforeReadCompletedListSize;
// read buffer might be dumped if the ReadBufferManager getblock preceded
// the action of buffer being picked for reading from readaheadqueue, so that
// inputstream can proceed with read and not be blocked on readahead thread
// availability. So the count of buffers in completedReadQueue for the stream
// can be same or lesser than the requests triggered to queue readahead.
Assertions.assertThat(newAdditionsToCompletedRead)
.describedAs(
"New additions to completed reads should be same or less than as number of readaheads")
.isLessThanOrEqualTo(3);
// Another read request whose requested data is already read ahead. // Another read request whose requested data is already read ahead.
inputStream.read(ONE_KB, new byte[ONE_KB], 0, ONE_KB); inputStream.read(ONE_KB, new byte[ONE_KB], 0, ONE_KB);