HADOOP-16852: Report read-ahead error back

Contributed by Sneha Vijayarajan
This commit is contained in:
Sneha Vijayarajan 2020-05-27 13:51:42 -07:00 committed by Thomas Marquardt
parent 27b20f9689
commit 869a68b81e
No known key found for this signature in database
GPG Key ID: AEB30C9E78868287
6 changed files with 604 additions and 10 deletions

View File

@ -24,6 +24,10 @@ import java.io.IOException;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.CanUnbuffer; import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSExceptionMessages;
@ -41,6 +45,7 @@ import static org.apache.hadoop.util.StringUtils.toLowerCase;
*/ */
public class AbfsInputStream extends FSInputStream implements CanUnbuffer, public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
StreamCapabilities { StreamCapabilities {
private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class);
private final AbfsClient client; private final AbfsClient client;
private final Statistics statistics; private final Statistics statistics;
@ -239,6 +244,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
final AbfsRestOperation op; final AbfsRestOperation op;
AbfsPerfTracker tracker = client.getAbfsPerfTracker(); AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) { try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) {
LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length);
op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get()); op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get());
cachedSasToken.update(op.getSasToken()); cachedSasToken.update(op.getSasToken());
perfInfo.registerResult(op.getResult()).registerSuccess(true); perfInfo.registerResult(op.getResult()).registerSuccess(true);
@ -431,4 +437,10 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
byte[] getBuffer() { byte[] getBuffer() {
return buffer; return buffer;
} }
@VisibleForTesting
protected void setCachedSasToken(final CachedSASToken cachedSasToken) {
this.cachedSasToken = cachedSasToken;
}
} }

View File

@ -18,10 +18,13 @@
package org.apache.hadoop.fs.azurebfs.services; package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
import static org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus.READ_FAILED;
class ReadBuffer { class ReadBuffer {
private AbfsInputStream stream; private AbfsInputStream stream;
@ -40,6 +43,8 @@ class ReadBuffer {
private boolean isLastByteConsumed = false; private boolean isLastByteConsumed = false;
private boolean isAnyByteConsumed = false; private boolean isAnyByteConsumed = false;
private IOException errException = null;
public AbfsInputStream getStream() { public AbfsInputStream getStream() {
return stream; return stream;
} }
@ -88,12 +93,23 @@ class ReadBuffer {
this.bufferindex = bufferindex; this.bufferindex = bufferindex;
} }
public IOException getErrException() {
return errException;
}
public void setErrException(final IOException errException) {
this.errException = errException;
}
public ReadBufferStatus getStatus() { public ReadBufferStatus getStatus() {
return status; return status;
} }
public void setStatus(ReadBufferStatus status) { public void setStatus(ReadBufferStatus status) {
this.status = status; this.status = status;
if (status == READ_FAILED) {
bufferindex = -1;
}
} }
public CountDownLatch getLatch() { public CountDownLatch getLatch() {

View File

@ -21,12 +21,15 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Queue; import java.util.Queue;
import java.util.Stack; import java.util.Stack;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import com.google.common.annotations.VisibleForTesting;
/** /**
* The Read Buffer Manager for Rest AbfsClient. * The Read Buffer Manager for Rest AbfsClient.
*/ */
@ -36,8 +39,9 @@ final class ReadBufferManager {
private static final int NUM_BUFFERS = 16; private static final int NUM_BUFFERS = 16;
private static final int BLOCK_SIZE = 4 * 1024 * 1024; private static final int BLOCK_SIZE = 4 * 1024 * 1024;
private static final int NUM_THREADS = 8; private static final int NUM_THREADS = 8;
private static final int THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold
private static int thresholdAgeMilliseconds = DEFAULT_THRESHOLD_AGE_MILLISECONDS;
private Thread[] threads = new Thread[NUM_THREADS]; private Thread[] threads = new Thread[NUM_THREADS];
private byte[][] buffers; // array of byte[] buffers, to hold the data that is read private byte[][] buffers; // array of byte[] buffers, to hold the data that is read
private Stack<Integer> freeList = new Stack<>(); // indices in buffers[] array that are available private Stack<Integer> freeList = new Stack<>(); // indices in buffers[] array that are available
@ -141,7 +145,8 @@ final class ReadBufferManager {
* @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0. * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0.
* @return the number of bytes read * @return the number of bytes read
*/ */
int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer) { int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer)
throws IOException {
// not synchronized, so have to be careful with locking // not synchronized, so have to be careful with locking
if (LOGGER.isTraceEnabled()) { if (LOGGER.isTraceEnabled()) {
LOGGER.trace("getBlock for file {} position {} thread {}", LOGGER.trace("getBlock for file {} position {} thread {}",
@ -244,7 +249,7 @@ final class ReadBufferManager {
earliestBirthday = buf.getTimeStamp(); earliestBirthday = buf.getTimeStamp();
} }
} }
if ((currentTimeMillis() - earliestBirthday > THRESHOLD_AGE_MILLISECONDS) && (nodeToEvict != null)) { if ((currentTimeMillis() - earliestBirthday > thresholdAgeMilliseconds) && (nodeToEvict != null)) {
return evict(nodeToEvict); return evict(nodeToEvict);
} }
@ -253,7 +258,12 @@ final class ReadBufferManager {
} }
private boolean evict(final ReadBuffer buf) { private boolean evict(final ReadBuffer buf) {
freeList.push(buf.getBufferindex()); // As failed ReadBuffers (bufferIndx = -1) are saved in completedReadList,
// avoid adding it to freeList.
if (buf.getBufferindex() != -1) {
freeList.push(buf.getBufferindex());
}
completedReadList.remove(buf); completedReadList.remove(buf);
if (LOGGER.isTraceEnabled()) { if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}", LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}",
@ -289,6 +299,27 @@ final class ReadBufferManager {
return null; return null;
} }
/**
* Returns buffers that failed or passed from completed queue.
* @param stream
* @param requestedOffset
* @return
*/
private ReadBuffer getBufferFromCompletedQueue(final AbfsInputStream stream, final long requestedOffset) {
for (ReadBuffer buffer : completedReadList) {
// Buffer is returned if the requestedOffset is at or above buffer's
// offset but less than buffer's length or the actual requestedLength
if ((buffer.getStream() == stream)
&& (requestedOffset >= buffer.getOffset())
&& ((requestedOffset < buffer.getOffset() + buffer.getLength())
|| (requestedOffset < buffer.getOffset() + buffer.getRequestedLength()))) {
return buffer;
}
}
return null;
}
private void clearFromReadAheadQueue(final AbfsInputStream stream, final long requestedOffset) { private void clearFromReadAheadQueue(final AbfsInputStream stream, final long requestedOffset) {
ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset); ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset);
if (buffer != null) { if (buffer != null) {
@ -299,11 +330,28 @@ final class ReadBufferManager {
} }
private int getBlockFromCompletedQueue(final AbfsInputStream stream, final long position, final int length, private int getBlockFromCompletedQueue(final AbfsInputStream stream, final long position, final int length,
final byte[] buffer) { final byte[] buffer) throws IOException {
ReadBuffer buf = getFromList(completedReadList, stream, position); ReadBuffer buf = getBufferFromCompletedQueue(stream, position);
if (buf == null || position >= buf.getOffset() + buf.getLength()) {
if (buf == null) {
return 0; return 0;
} }
if (buf.getStatus() == ReadBufferStatus.READ_FAILED) {
// To prevent new read requests to fail due to old read-ahead attempts,
// return exception only from buffers that failed within last thresholdAgeMilliseconds
if ((currentTimeMillis() - (buf.getTimeStamp()) < thresholdAgeMilliseconds)) {
throw buf.getErrException();
} else {
return 0;
}
}
if ((buf.getStatus() != ReadBufferStatus.AVAILABLE)
|| (position >= buf.getOffset() + buf.getLength())) {
return 0;
}
int cursor = (int) (position - buf.getOffset()); int cursor = (int) (position - buf.getOffset());
int availableLengthInBuffer = buf.getLength() - cursor; int availableLengthInBuffer = buf.getLength() - cursor;
int lengthToCopy = Math.min(length, availableLengthInBuffer); int lengthToCopy = Math.min(length, availableLengthInBuffer);
@ -368,14 +416,18 @@ final class ReadBufferManager {
inProgressList.remove(buffer); inProgressList.remove(buffer);
if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
buffer.setStatus(ReadBufferStatus.AVAILABLE); buffer.setStatus(ReadBufferStatus.AVAILABLE);
buffer.setTimeStamp(currentTimeMillis());
buffer.setLength(bytesActuallyRead); buffer.setLength(bytesActuallyRead);
completedReadList.add(buffer); completedReadList.add(buffer);
} else { } else {
freeList.push(buffer.getBufferindex()); freeList.push(buffer.getBufferindex());
// buffer should go out of scope after the end of the calling method in ReadBufferWorker, and eligible for GC // buffer will be deleted as per the eviction policy.
} }
buffer.setStatus(result);
buffer.setTimeStamp(currentTimeMillis());
completedReadList.add(buffer);
} }
//outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results
buffer.getLatch().countDown(); // wake up waiting threads (if any) buffer.getLatch().countDown(); // wake up waiting threads (if any)
} }
@ -392,4 +444,24 @@ final class ReadBufferManager {
private long currentTimeMillis() { private long currentTimeMillis() {
return System.nanoTime() / 1000 / 1000; return System.nanoTime() / 1000 / 1000;
} }
@VisibleForTesting
int getThresholdAgeMilliseconds() {
return thresholdAgeMilliseconds;
}
@VisibleForTesting
static void setThresholdAgeMilliseconds(int thresholdAgeMs) {
thresholdAgeMilliseconds = thresholdAgeMs;
}
@VisibleForTesting
int getCompletedReadListSize() {
return completedReadList.size();
}
@VisibleForTesting
void callTryEvict() {
tryEvict();
}
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.fs.azurebfs.services; package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
@ -61,9 +62,18 @@ class ReadBufferWorker implements Runnable {
if (buffer != null) { if (buffer != null) {
try { try {
// do the actual read, from the file. // do the actual read, from the file.
int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength()); int bytesRead = buffer.getStream().readRemote(
buffer.getOffset(),
buffer.getBuffer(),
0,
// If AbfsInputStream was created with bigger buffer size than
// read-ahead buffer size, make sure a valid length is passed
// for remote read
Math.min(buffer.getRequestedLength(), buffer.getBuffer().length));
bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager
} catch (Exception ex) { } catch (Exception ex) {
buffer.setErrException(new IOException(ex));
bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0); bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0);
} }
} }

View File

@ -0,0 +1,450 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import org.junit.Assert;
import org.junit.Test;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
import org.apache.hadoop.fs.azurebfs.utils.TestCachedSASToken;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH;
/**
* Unit test AbfsInputStream.
*/
public class TestAbfsInputStream extends
AbstractAbfsIntegrationTest {
private static final int ONE_KB = 1 * 1024;
private static final int TWO_KB = 2 * 1024;
private static final int THREE_KB = 3 * 1024;
private static final int REDUCED_READ_BUFFER_AGE_THRESHOLD = 3000; // 3 sec
private AbfsRestOperation getMockRestOp() {
AbfsRestOperation op = mock(AbfsRestOperation.class);
AbfsHttpOperation httpOp = mock(AbfsHttpOperation.class);
when(httpOp.getBytesReceived()).thenReturn(1024L);
when(op.getResult()).thenReturn(httpOp);
when(op.getSasToken()).thenReturn(TestCachedSASToken.getTestCachedSASTokenInstance().get());
return op;
}
private AbfsClient getMockAbfsClient() {
// Mock failure for client.read()
AbfsClient client = mock(AbfsClient.class);
AbfsPerfTracker tracker = new AbfsPerfTracker(
"test",
this.getAccountName(),
this.getConfiguration());
when(client.getAbfsPerfTracker()).thenReturn(tracker);
return client;
}
private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient, String fileName) {
AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1);
// Create AbfsInputStream with the client instance
AbfsInputStream inputStream = new AbfsInputStream(
mockAbfsClient,
null,
FORWARD_SLASH + fileName,
THREE_KB,
inputStreamContext.withReadBufferSize(ONE_KB).withReadAheadQueueDepth(10),
"eTag");
inputStream.setCachedSasToken(
TestCachedSASToken.getTestCachedSASTokenInstance());
return inputStream;
}
private void queueReadAheads(AbfsInputStream inputStream) {
// Mimic AbfsInputStream readAhead queue requests
ReadBufferManager.getBufferManager()
.queueReadAhead(inputStream, 0, ONE_KB);
ReadBufferManager.getBufferManager()
.queueReadAhead(inputStream, ONE_KB, ONE_KB);
ReadBufferManager.getBufferManager()
.queueReadAhead(inputStream, TWO_KB, TWO_KB);
}
private void verifyReadCallCount(AbfsClient client, int count) throws
AzureBlobFileSystemException, InterruptedException {
// ReadAhead threads are triggered asynchronously.
// Wait a second before verifying the number of total calls.
Thread.sleep(1000);
verify(client, times(count)).read(any(String.class), any(Long.class),
any(byte[].class), any(Integer.class), any(Integer.class),
any(String.class), any(String.class));
}
private void checkEvictedStatus(AbfsInputStream inputStream, int position, boolean expectedToThrowException)
throws Exception {
// Sleep for the eviction threshold time
Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds() + 1000);
// Eviction is done only when AbfsInputStream tries to queue new items.
// 1 tryEvict will remove 1 eligible item. To ensure that the current test buffer
// will get evicted (considering there could be other tests running in parallel),
// call tryEvict for the number of items that are there in completedReadList.
int numOfCompletedReadListItems = ReadBufferManager.getBufferManager().getCompletedReadListSize();
while (numOfCompletedReadListItems > 0) {
ReadBufferManager.getBufferManager().callTryEvict();
numOfCompletedReadListItems--;
}
if (expectedToThrowException) {
intercept(IOException.class,
() -> inputStream.read(position, new byte[ONE_KB], 0, ONE_KB));
} else {
inputStream.read(position, new byte[ONE_KB], 0, ONE_KB);
}
}
public TestAbfsInputStream() throws Exception {
super();
// Reduce thresholdAgeMilliseconds to 3 sec for the tests
ReadBufferManager.getBufferManager().setThresholdAgeMilliseconds(REDUCED_READ_BUFFER_AGE_THRESHOLD);
}
/**
* This test expects AbfsInputStream to throw the exception that readAhead
* thread received on read. The readAhead thread must be initiated from the
* active read request itself.
* Also checks that the ReadBuffers are evicted as per the ReadBufferManager
* threshold criteria.
* @throws Exception
*/
@Test
public void testFailedReadAhead() throws Exception {
AbfsClient client = getMockAbfsClient();
AbfsRestOperation successOp = getMockRestOp();
// Stub :
// Read request leads to 3 readahead calls: Fail all 3 readahead-client.read()
// Actual read request fails with the failure in readahead thread
doThrow(new TimeoutException("Internal Server error for RAH-Thread-X"))
.doThrow(new TimeoutException("Internal Server error for RAH-Thread-Y"))
.doThrow(new TimeoutException("Internal Server error RAH-Thread-Z"))
.doReturn(successOp) // Any extra calls to read, pass it.
.when(client)
.read(any(String.class), any(Long.class), any(byte[].class),
any(Integer.class), any(Integer.class), any(String.class),
any(String.class));
AbfsInputStream inputStream = getAbfsInputStream(client, "testFailedReadAhead.txt");
// Scenario: ReadAhead triggered from current active read call failed
// Before the change to return exception from readahead buffer,
// AbfsInputStream would have triggered an extra readremote on noticing
// data absent in readahead buffers
// In this test, a read should trigger 3 client.read() calls as file is 3 KB
// and readahead buffer size set in AbfsInputStream is 1 KB
// There should only be a total of 3 client.read() in this test.
intercept(IOException.class,
() -> inputStream.read(new byte[ONE_KB]));
// Only the 3 readAhead threads should have triggered client.read
verifyReadCallCount(client, 3);
// Stub returns success for the 4th read request, if ReadBuffers still
// persisted, ReadAheadManager getBlock would have returned exception.
checkEvictedStatus(inputStream, 0, false);
}
/**
* The test expects AbfsInputStream to initiate a remote read request for
* the request offset and length when previous read ahead on the offset had failed.
* Also checks that the ReadBuffers are evicted as per the ReadBufferManager
* threshold criteria.
* @throws Exception
*/
@Test
public void testOlderReadAheadFailure() throws Exception {
AbfsClient client = getMockAbfsClient();
AbfsRestOperation successOp = getMockRestOp();
// Stub :
// First Read request leads to 3 readahead calls: Fail all 3 readahead-client.read()
// A second read request will see that readahead had failed for data in
// the requested offset range and also that its is an older readahead request.
// So attempt a new read only for the requested range.
doThrow(new TimeoutException("Internal Server error for RAH-X"))
.doThrow(new TimeoutException("Internal Server error for RAH-Y"))
.doThrow(new TimeoutException("Internal Server error for RAH-Z"))
.doReturn(successOp) // pass the read for second read request
.doReturn(successOp) // pass success for post eviction test
.when(client)
.read(any(String.class), any(Long.class), any(byte[].class),
any(Integer.class), any(Integer.class), any(String.class),
any(String.class));
AbfsInputStream inputStream = getAbfsInputStream(client, "testOlderReadAheadFailure.txt");
// First read request that fails as the readahead triggered from this request failed.
intercept(IOException.class,
() -> inputStream.read(new byte[ONE_KB]));
// Only the 3 readAhead threads should have triggered client.read
verifyReadCallCount(client, 3);
// Sleep for thresholdAgeMs so that the read ahead buffer qualifies for being old.
Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds());
// Second read request should retry the read (and not issue any new readaheads)
inputStream.read(ONE_KB, new byte[ONE_KB], 0, ONE_KB);
// Once created, mock will remember all interactions. So total number of read
// calls will be one more from earlier (there is a reset mock which will reset the
// count, but the mock stub is erased as well which needs AbsInputStream to be recreated,
// which beats the purpose)
verifyReadCallCount(client, 4);
// Stub returns success for the 5th read request, if ReadBuffers still
// persisted request would have failed for position 0.
checkEvictedStatus(inputStream, 0, false);
}
/**
* The test expects AbfsInputStream to utilize any data read ahead for
* requested offset and length.
* @throws Exception
*/
@Test
public void testSuccessfulReadAhead() throws Exception {
// Mock failure for client.read()
AbfsClient client = getMockAbfsClient();
// Success operation mock
AbfsRestOperation op = getMockRestOp();
// Stub :
// Pass all readAheads and fail the post eviction request to
// prove ReadAhead buffer is used
// for post eviction check, fail all read aheads
doReturn(op)
.doReturn(op)
.doReturn(op)
.doThrow(new TimeoutException("Internal Server error for RAH-X"))
.doThrow(new TimeoutException("Internal Server error for RAH-Y"))
.doThrow(new TimeoutException("Internal Server error for RAH-Z"))
.when(client)
.read(any(String.class), any(Long.class), any(byte[].class),
any(Integer.class), any(Integer.class), any(String.class),
any(String.class));
AbfsInputStream inputStream = getAbfsInputStream(client, "testSuccessfulReadAhead.txt");
// First read request that triggers readAheads.
inputStream.read(new byte[ONE_KB]);
// Only the 3 readAhead threads should have triggered client.read
verifyReadCallCount(client, 3);
// Another read request whose requested data is already read ahead.
inputStream.read(ONE_KB, new byte[ONE_KB], 0, ONE_KB);
// Once created, mock will remember all interactions.
// As the above read should not have triggered any server calls, total
// number of read calls made at this point will be same as last.
verifyReadCallCount(client, 3);
// Stub will throw exception for client.read() for 4th and later calls
// if not using the read-ahead buffer exception will be thrown on read
checkEvictedStatus(inputStream, 0, true);
}
/**
* This test expects ReadAheadManager to throw exception if the read ahead
* thread had failed within the last thresholdAgeMilliseconds.
* Also checks that the ReadBuffers are evicted as per the ReadBufferManager
* threshold criteria.
* @throws Exception
*/
@Test
public void testReadAheadManagerForFailedReadAhead() throws Exception {
AbfsClient client = getMockAbfsClient();
AbfsRestOperation successOp = getMockRestOp();
// Stub :
// Read request leads to 3 readahead calls: Fail all 3 readahead-client.read()
// Actual read request fails with the failure in readahead thread
doThrow(new TimeoutException("Internal Server error for RAH-Thread-X"))
.doThrow(new TimeoutException("Internal Server error for RAH-Thread-Y"))
.doThrow(new TimeoutException("Internal Server error RAH-Thread-Z"))
.doReturn(successOp) // Any extra calls to read, pass it.
.when(client)
.read(any(String.class), any(Long.class), any(byte[].class),
any(Integer.class), any(Integer.class), any(String.class),
any(String.class));
AbfsInputStream inputStream = getAbfsInputStream(client, "testReadAheadManagerForFailedReadAhead.txt");
queueReadAheads(inputStream);
// AbfsInputStream Read would have waited for the read-ahead for the requested offset
// as we are testing from ReadAheadManager directly, sleep for a sec to
// get the read ahead threads to complete
Thread.sleep(1000);
// if readAhead failed for specific offset, getBlock should
// throw exception from the ReadBuffer that failed within last thresholdAgeMilliseconds sec
intercept(IOException.class,
() -> ReadBufferManager.getBufferManager().getBlock(
inputStream,
0,
ONE_KB,
new byte[ONE_KB]));
// Only the 3 readAhead threads should have triggered client.read
verifyReadCallCount(client, 3);
// Stub returns success for the 4th read request, if ReadBuffers still
// persisted, ReadAheadManager getBlock would have returned exception.
checkEvictedStatus(inputStream, 0, false);
}
/**
* The test expects ReadAheadManager to return 0 receivedBytes when previous
* read ahead on the offset had failed and not throw exception received then.
* Also checks that the ReadBuffers are evicted as per the ReadBufferManager
* threshold criteria.
* @throws Exception
*/
@Test
public void testReadAheadManagerForOlderReadAheadFailure() throws Exception {
AbfsClient client = getMockAbfsClient();
AbfsRestOperation successOp = getMockRestOp();
// Stub :
// First Read request leads to 3 readahead calls: Fail all 3 readahead-client.read()
// A second read request will see that readahead had failed for data in
// the requested offset range but also that its is an older readahead request.
// System issue could have resolved by now, so attempt a new read only for the requested range.
doThrow(new TimeoutException("Internal Server error for RAH-X"))
.doThrow(new TimeoutException("Internal Server error for RAH-X"))
.doThrow(new TimeoutException("Internal Server error for RAH-X"))
.doReturn(successOp) // pass the read for second read request
.doReturn(successOp) // pass success for post eviction test
.when(client)
.read(any(String.class), any(Long.class), any(byte[].class),
any(Integer.class), any(Integer.class), any(String.class),
any(String.class));
AbfsInputStream inputStream = getAbfsInputStream(client, "testReadAheadManagerForOlderReadAheadFailure.txt");
queueReadAheads(inputStream);
// AbfsInputStream Read would have waited for the read-ahead for the requested offset
// as we are testing from ReadAheadManager directly, sleep for thresholdAgeMilliseconds so that
// read buffer qualifies for to be an old buffer
Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds());
// Only the 3 readAhead threads should have triggered client.read
verifyReadCallCount(client, 3);
// getBlock from a new read request should return 0 if there is a failure
// 30 sec before in read ahead buffer for respective offset.
int bytesRead = ReadBufferManager.getBufferManager().getBlock(
inputStream,
ONE_KB,
ONE_KB,
new byte[ONE_KB]);
Assert.assertEquals("bytesRead should be zero when previously read "
+ "ahead buffer had failed", 0, bytesRead);
// Stub returns success for the 5th read request, if ReadBuffers still
// persisted request would have failed for position 0.
checkEvictedStatus(inputStream, 0, false);
}
/**
* The test expects ReadAheadManager to return data from previously read
* ahead data of same offset.
* @throws Exception
*/
@Test
public void testReadAheadManagerForSuccessfulReadAhead() throws Exception {
// Mock failure for client.read()
AbfsClient client = getMockAbfsClient();
// Success operation mock
AbfsRestOperation op = getMockRestOp();
// Stub :
// Pass all readAheads and fail the post eviction request to
// prove ReadAhead buffer is used
doReturn(op)
.doReturn(op)
.doReturn(op)
.doThrow(new TimeoutException("Internal Server error for RAH-X")) // for post eviction request
.doThrow(new TimeoutException("Internal Server error for RAH-Y"))
.doThrow(new TimeoutException("Internal Server error for RAH-Z"))
.when(client)
.read(any(String.class), any(Long.class), any(byte[].class),
any(Integer.class), any(Integer.class), any(String.class),
any(String.class));
AbfsInputStream inputStream = getAbfsInputStream(client, "testSuccessfulReadAhead.txt");
queueReadAheads(inputStream);
// AbfsInputStream Read would have waited for the read-ahead for the requested offset
// as we are testing from ReadAheadManager directly, sleep for a sec to
// get the read ahead threads to complete
Thread.sleep(1000);
// Only the 3 readAhead threads should have triggered client.read
verifyReadCallCount(client, 3);
// getBlock for a new read should return the buffer read-ahead
int bytesRead = ReadBufferManager.getBufferManager().getBlock(
inputStream,
ONE_KB,
ONE_KB,
new byte[ONE_KB]);
Assert.assertTrue("bytesRead should be non-zero from the "
+ "buffer that was read-ahead", bytesRead > 0);
// Once created, mock will remember all interactions.
// As the above read should not have triggered any server calls, total
// number of read calls made at this point will be same as last.
verifyReadCallCount(client, 3);
// Stub will throw exception for client.read() for 4th and later calls
// if not using the read-ahead buffer exception will be thrown on read
checkEvictedStatus(inputStream, 0, true);
}
}

View File

@ -22,12 +22,14 @@ import java.io.IOException;
import java.time.OffsetDateTime; import java.time.OffsetDateTime;
import java.time.ZoneOffset; import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.UUID;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS;
import static java.time.temporal.ChronoUnit.SECONDS; import static java.time.temporal.ChronoUnit.SECONDS;
import static java.time.temporal.ChronoUnit.DAYS;
/** /**
* Test CachedSASToken. * Test CachedSASToken.
@ -159,4 +161,36 @@ public final class TestCachedSASToken {
cachedToken = cachedSasToken.get(); cachedToken = cachedSasToken.get();
Assert.assertNull(cachedToken); Assert.assertNull(cachedToken);
} }
public static CachedSASToken getTestCachedSASTokenInstance() {
String expiryPostADay = OffsetDateTime.now(ZoneOffset.UTC)
.plus(1, DAYS)
.format(DateTimeFormatter.ISO_DATE_TIME);
String version = "2020-20-20";
StringBuilder sb = new StringBuilder();
sb.append("skoid=");
sb.append(UUID.randomUUID().toString());
sb.append("&sktid=");
sb.append(UUID.randomUUID().toString());
sb.append("&skt=");
sb.append(OffsetDateTime.now(ZoneOffset.UTC)
.minus(1, DAYS)
.format(DateTimeFormatter.ISO_DATE_TIME));
sb.append("&ske=");
sb.append(expiryPostADay);
sb.append("&sks=b");
sb.append("&skv=");
sb.append(version);
sb.append("&sp=rw");
sb.append("&sr=b");
sb.append("&se=");
sb.append(expiryPostADay);
sb.append("&sv=2");
sb.append(version);
CachedSASToken cachedSASToken = new CachedSASToken();
cachedSASToken.update(sb.toString());
return cachedSASToken;
}
} }