HADOOP-16852: Report read-ahead error back
Contributed by Sneha Vijayarajan
This commit is contained in:
parent
2148a8fe64
commit
53b993e604
|
@ -24,6 +24,10 @@ import java.io.IOException;
|
|||
import java.net.HttpURLConnection;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.fs.CanUnbuffer;
|
||||
import org.apache.hadoop.fs.FSExceptionMessages;
|
||||
|
@ -41,6 +45,7 @@ import static org.apache.hadoop.util.StringUtils.toLowerCase;
|
|||
*/
|
||||
public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
||||
StreamCapabilities {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class);
|
||||
|
||||
private final AbfsClient client;
|
||||
private final Statistics statistics;
|
||||
|
@ -239,6 +244,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
|||
final AbfsRestOperation op;
|
||||
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
|
||||
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) {
|
||||
LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length);
|
||||
op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get());
|
||||
cachedSasToken.update(op.getSasToken());
|
||||
perfInfo.registerResult(op.getResult()).registerSuccess(true);
|
||||
|
@ -431,4 +437,10 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
|||
byte[] getBuffer() {
|
||||
return buffer;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected void setCachedSasToken(final CachedSASToken cachedSasToken) {
|
||||
this.cachedSasToken = cachedSasToken;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,10 +18,13 @@
|
|||
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus.READ_FAILED;
|
||||
|
||||
class ReadBuffer {
|
||||
|
||||
private AbfsInputStream stream;
|
||||
|
@ -40,6 +43,8 @@ class ReadBuffer {
|
|||
private boolean isLastByteConsumed = false;
|
||||
private boolean isAnyByteConsumed = false;
|
||||
|
||||
private IOException errException = null;
|
||||
|
||||
public AbfsInputStream getStream() {
|
||||
return stream;
|
||||
}
|
||||
|
@ -88,12 +93,23 @@ class ReadBuffer {
|
|||
this.bufferindex = bufferindex;
|
||||
}
|
||||
|
||||
public IOException getErrException() {
|
||||
return errException;
|
||||
}
|
||||
|
||||
public void setErrException(final IOException errException) {
|
||||
this.errException = errException;
|
||||
}
|
||||
|
||||
public ReadBufferStatus getStatus() {
|
||||
return status;
|
||||
}
|
||||
|
||||
public void setStatus(ReadBufferStatus status) {
|
||||
this.status = status;
|
||||
if (status == READ_FAILED) {
|
||||
bufferindex = -1;
|
||||
}
|
||||
}
|
||||
|
||||
public CountDownLatch getLatch() {
|
||||
|
|
|
@ -21,12 +21,15 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Queue;
|
||||
import java.util.Stack;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* The Read Buffer Manager for Rest AbfsClient.
|
||||
*/
|
||||
|
@ -36,8 +39,9 @@ final class ReadBufferManager {
|
|||
private static final int NUM_BUFFERS = 16;
|
||||
private static final int BLOCK_SIZE = 4 * 1024 * 1024;
|
||||
private static final int NUM_THREADS = 8;
|
||||
private static final int THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold
|
||||
private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold
|
||||
|
||||
private static int thresholdAgeMilliseconds = DEFAULT_THRESHOLD_AGE_MILLISECONDS;
|
||||
private Thread[] threads = new Thread[NUM_THREADS];
|
||||
private byte[][] buffers; // array of byte[] buffers, to hold the data that is read
|
||||
private Stack<Integer> freeList = new Stack<>(); // indices in buffers[] array that are available
|
||||
|
@ -141,7 +145,8 @@ final class ReadBufferManager {
|
|||
* @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0.
|
||||
* @return the number of bytes read
|
||||
*/
|
||||
int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer) {
|
||||
int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer)
|
||||
throws IOException {
|
||||
// not synchronized, so have to be careful with locking
|
||||
if (LOGGER.isTraceEnabled()) {
|
||||
LOGGER.trace("getBlock for file {} position {} thread {}",
|
||||
|
@ -244,7 +249,7 @@ final class ReadBufferManager {
|
|||
earliestBirthday = buf.getTimeStamp();
|
||||
}
|
||||
}
|
||||
if ((currentTimeMillis() - earliestBirthday > THRESHOLD_AGE_MILLISECONDS) && (nodeToEvict != null)) {
|
||||
if ((currentTimeMillis() - earliestBirthday > thresholdAgeMilliseconds) && (nodeToEvict != null)) {
|
||||
return evict(nodeToEvict);
|
||||
}
|
||||
|
||||
|
@ -253,7 +258,12 @@ final class ReadBufferManager {
|
|||
}
|
||||
|
||||
private boolean evict(final ReadBuffer buf) {
|
||||
// As failed ReadBuffers (bufferIndx = -1) are saved in completedReadList,
|
||||
// avoid adding it to freeList.
|
||||
if (buf.getBufferindex() != -1) {
|
||||
freeList.push(buf.getBufferindex());
|
||||
}
|
||||
|
||||
completedReadList.remove(buf);
|
||||
if (LOGGER.isTraceEnabled()) {
|
||||
LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}",
|
||||
|
@ -289,6 +299,27 @@ final class ReadBufferManager {
|
|||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns buffers that failed or passed from completed queue.
|
||||
* @param stream
|
||||
* @param requestedOffset
|
||||
* @return
|
||||
*/
|
||||
private ReadBuffer getBufferFromCompletedQueue(final AbfsInputStream stream, final long requestedOffset) {
|
||||
for (ReadBuffer buffer : completedReadList) {
|
||||
// Buffer is returned if the requestedOffset is at or above buffer's
|
||||
// offset but less than buffer's length or the actual requestedLength
|
||||
if ((buffer.getStream() == stream)
|
||||
&& (requestedOffset >= buffer.getOffset())
|
||||
&& ((requestedOffset < buffer.getOffset() + buffer.getLength())
|
||||
|| (requestedOffset < buffer.getOffset() + buffer.getRequestedLength()))) {
|
||||
return buffer;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private void clearFromReadAheadQueue(final AbfsInputStream stream, final long requestedOffset) {
|
||||
ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset);
|
||||
if (buffer != null) {
|
||||
|
@ -299,11 +330,28 @@ final class ReadBufferManager {
|
|||
}
|
||||
|
||||
private int getBlockFromCompletedQueue(final AbfsInputStream stream, final long position, final int length,
|
||||
final byte[] buffer) {
|
||||
ReadBuffer buf = getFromList(completedReadList, stream, position);
|
||||
if (buf == null || position >= buf.getOffset() + buf.getLength()) {
|
||||
final byte[] buffer) throws IOException {
|
||||
ReadBuffer buf = getBufferFromCompletedQueue(stream, position);
|
||||
|
||||
if (buf == null) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (buf.getStatus() == ReadBufferStatus.READ_FAILED) {
|
||||
// To prevent new read requests to fail due to old read-ahead attempts,
|
||||
// return exception only from buffers that failed within last thresholdAgeMilliseconds
|
||||
if ((currentTimeMillis() - (buf.getTimeStamp()) < thresholdAgeMilliseconds)) {
|
||||
throw buf.getErrException();
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
if ((buf.getStatus() != ReadBufferStatus.AVAILABLE)
|
||||
|| (position >= buf.getOffset() + buf.getLength())) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int cursor = (int) (position - buf.getOffset());
|
||||
int availableLengthInBuffer = buf.getLength() - cursor;
|
||||
int lengthToCopy = Math.min(length, availableLengthInBuffer);
|
||||
|
@ -368,14 +416,18 @@ final class ReadBufferManager {
|
|||
inProgressList.remove(buffer);
|
||||
if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
|
||||
buffer.setStatus(ReadBufferStatus.AVAILABLE);
|
||||
buffer.setTimeStamp(currentTimeMillis());
|
||||
buffer.setLength(bytesActuallyRead);
|
||||
completedReadList.add(buffer);
|
||||
} else {
|
||||
freeList.push(buffer.getBufferindex());
|
||||
// buffer should go out of scope after the end of the calling method in ReadBufferWorker, and eligible for GC
|
||||
// buffer will be deleted as per the eviction policy.
|
||||
}
|
||||
|
||||
buffer.setStatus(result);
|
||||
buffer.setTimeStamp(currentTimeMillis());
|
||||
completedReadList.add(buffer);
|
||||
}
|
||||
|
||||
//outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results
|
||||
buffer.getLatch().countDown(); // wake up waiting threads (if any)
|
||||
}
|
||||
|
@ -392,4 +444,24 @@ final class ReadBufferManager {
|
|||
private long currentTimeMillis() {
|
||||
return System.nanoTime() / 1000 / 1000;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
int getThresholdAgeMilliseconds() {
|
||||
return thresholdAgeMilliseconds;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static void setThresholdAgeMilliseconds(int thresholdAgeMs) {
|
||||
thresholdAgeMilliseconds = thresholdAgeMs;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
int getCompletedReadListSize() {
|
||||
return completedReadList.size();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void callTryEvict() {
|
||||
tryEvict();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
|
||||
|
@ -61,9 +62,18 @@ class ReadBufferWorker implements Runnable {
|
|||
if (buffer != null) {
|
||||
try {
|
||||
// do the actual read, from the file.
|
||||
int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength());
|
||||
int bytesRead = buffer.getStream().readRemote(
|
||||
buffer.getOffset(),
|
||||
buffer.getBuffer(),
|
||||
0,
|
||||
// If AbfsInputStream was created with bigger buffer size than
|
||||
// read-ahead buffer size, make sure a valid length is passed
|
||||
// for remote read
|
||||
Math.min(buffer.getRequestedLength(), buffer.getBuffer().length));
|
||||
|
||||
bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager
|
||||
} catch (Exception ex) {
|
||||
buffer.setErrException(new IOException(ex));
|
||||
bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,450 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
|
||||
import org.apache.hadoop.fs.azurebfs.utils.TestCachedSASToken;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH;
|
||||
|
||||
/**
|
||||
* Unit test AbfsInputStream.
|
||||
*/
|
||||
public class TestAbfsInputStream extends
|
||||
AbstractAbfsIntegrationTest {
|
||||
|
||||
private static final int ONE_KB = 1 * 1024;
|
||||
private static final int TWO_KB = 2 * 1024;
|
||||
private static final int THREE_KB = 3 * 1024;
|
||||
private static final int REDUCED_READ_BUFFER_AGE_THRESHOLD = 3000; // 3 sec
|
||||
|
||||
private AbfsRestOperation getMockRestOp() {
|
||||
AbfsRestOperation op = mock(AbfsRestOperation.class);
|
||||
AbfsHttpOperation httpOp = mock(AbfsHttpOperation.class);
|
||||
when(httpOp.getBytesReceived()).thenReturn(1024L);
|
||||
when(op.getResult()).thenReturn(httpOp);
|
||||
when(op.getSasToken()).thenReturn(TestCachedSASToken.getTestCachedSASTokenInstance().get());
|
||||
return op;
|
||||
}
|
||||
|
||||
private AbfsClient getMockAbfsClient() {
|
||||
// Mock failure for client.read()
|
||||
AbfsClient client = mock(AbfsClient.class);
|
||||
AbfsPerfTracker tracker = new AbfsPerfTracker(
|
||||
"test",
|
||||
this.getAccountName(),
|
||||
this.getConfiguration());
|
||||
when(client.getAbfsPerfTracker()).thenReturn(tracker);
|
||||
|
||||
return client;
|
||||
}
|
||||
|
||||
private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient, String fileName) {
|
||||
AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1);
|
||||
// Create AbfsInputStream with the client instance
|
||||
AbfsInputStream inputStream = new AbfsInputStream(
|
||||
mockAbfsClient,
|
||||
null,
|
||||
FORWARD_SLASH + fileName,
|
||||
THREE_KB,
|
||||
inputStreamContext.withReadBufferSize(ONE_KB).withReadAheadQueueDepth(10),
|
||||
"eTag");
|
||||
|
||||
inputStream.setCachedSasToken(
|
||||
TestCachedSASToken.getTestCachedSASTokenInstance());
|
||||
|
||||
return inputStream;
|
||||
}
|
||||
|
||||
private void queueReadAheads(AbfsInputStream inputStream) {
|
||||
// Mimic AbfsInputStream readAhead queue requests
|
||||
ReadBufferManager.getBufferManager()
|
||||
.queueReadAhead(inputStream, 0, ONE_KB);
|
||||
ReadBufferManager.getBufferManager()
|
||||
.queueReadAhead(inputStream, ONE_KB, ONE_KB);
|
||||
ReadBufferManager.getBufferManager()
|
||||
.queueReadAhead(inputStream, TWO_KB, TWO_KB);
|
||||
}
|
||||
|
||||
private void verifyReadCallCount(AbfsClient client, int count) throws
|
||||
AzureBlobFileSystemException, InterruptedException {
|
||||
// ReadAhead threads are triggered asynchronously.
|
||||
// Wait a second before verifying the number of total calls.
|
||||
Thread.sleep(1000);
|
||||
verify(client, times(count)).read(any(String.class), any(Long.class),
|
||||
any(byte[].class), any(Integer.class), any(Integer.class),
|
||||
any(String.class), any(String.class));
|
||||
}
|
||||
|
||||
private void checkEvictedStatus(AbfsInputStream inputStream, int position, boolean expectedToThrowException)
|
||||
throws Exception {
|
||||
// Sleep for the eviction threshold time
|
||||
Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds() + 1000);
|
||||
|
||||
// Eviction is done only when AbfsInputStream tries to queue new items.
|
||||
// 1 tryEvict will remove 1 eligible item. To ensure that the current test buffer
|
||||
// will get evicted (considering there could be other tests running in parallel),
|
||||
// call tryEvict for the number of items that are there in completedReadList.
|
||||
int numOfCompletedReadListItems = ReadBufferManager.getBufferManager().getCompletedReadListSize();
|
||||
while (numOfCompletedReadListItems > 0) {
|
||||
ReadBufferManager.getBufferManager().callTryEvict();
|
||||
numOfCompletedReadListItems--;
|
||||
}
|
||||
|
||||
if (expectedToThrowException) {
|
||||
intercept(IOException.class,
|
||||
() -> inputStream.read(position, new byte[ONE_KB], 0, ONE_KB));
|
||||
} else {
|
||||
inputStream.read(position, new byte[ONE_KB], 0, ONE_KB);
|
||||
}
|
||||
}
|
||||
|
||||
public TestAbfsInputStream() throws Exception {
|
||||
super();
|
||||
// Reduce thresholdAgeMilliseconds to 3 sec for the tests
|
||||
ReadBufferManager.getBufferManager().setThresholdAgeMilliseconds(REDUCED_READ_BUFFER_AGE_THRESHOLD);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test expects AbfsInputStream to throw the exception that readAhead
|
||||
* thread received on read. The readAhead thread must be initiated from the
|
||||
* active read request itself.
|
||||
* Also checks that the ReadBuffers are evicted as per the ReadBufferManager
|
||||
* threshold criteria.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testFailedReadAhead() throws Exception {
|
||||
AbfsClient client = getMockAbfsClient();
|
||||
AbfsRestOperation successOp = getMockRestOp();
|
||||
|
||||
// Stub :
|
||||
// Read request leads to 3 readahead calls: Fail all 3 readahead-client.read()
|
||||
// Actual read request fails with the failure in readahead thread
|
||||
doThrow(new TimeoutException("Internal Server error for RAH-Thread-X"))
|
||||
.doThrow(new TimeoutException("Internal Server error for RAH-Thread-Y"))
|
||||
.doThrow(new TimeoutException("Internal Server error RAH-Thread-Z"))
|
||||
.doReturn(successOp) // Any extra calls to read, pass it.
|
||||
.when(client)
|
||||
.read(any(String.class), any(Long.class), any(byte[].class),
|
||||
any(Integer.class), any(Integer.class), any(String.class),
|
||||
any(String.class));
|
||||
|
||||
AbfsInputStream inputStream = getAbfsInputStream(client, "testFailedReadAhead.txt");
|
||||
|
||||
// Scenario: ReadAhead triggered from current active read call failed
|
||||
// Before the change to return exception from readahead buffer,
|
||||
// AbfsInputStream would have triggered an extra readremote on noticing
|
||||
// data absent in readahead buffers
|
||||
// In this test, a read should trigger 3 client.read() calls as file is 3 KB
|
||||
// and readahead buffer size set in AbfsInputStream is 1 KB
|
||||
// There should only be a total of 3 client.read() in this test.
|
||||
intercept(IOException.class,
|
||||
() -> inputStream.read(new byte[ONE_KB]));
|
||||
|
||||
// Only the 3 readAhead threads should have triggered client.read
|
||||
verifyReadCallCount(client, 3);
|
||||
|
||||
// Stub returns success for the 4th read request, if ReadBuffers still
|
||||
// persisted, ReadAheadManager getBlock would have returned exception.
|
||||
checkEvictedStatus(inputStream, 0, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* The test expects AbfsInputStream to initiate a remote read request for
|
||||
* the request offset and length when previous read ahead on the offset had failed.
|
||||
* Also checks that the ReadBuffers are evicted as per the ReadBufferManager
|
||||
* threshold criteria.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testOlderReadAheadFailure() throws Exception {
|
||||
AbfsClient client = getMockAbfsClient();
|
||||
AbfsRestOperation successOp = getMockRestOp();
|
||||
|
||||
// Stub :
|
||||
// First Read request leads to 3 readahead calls: Fail all 3 readahead-client.read()
|
||||
// A second read request will see that readahead had failed for data in
|
||||
// the requested offset range and also that its is an older readahead request.
|
||||
// So attempt a new read only for the requested range.
|
||||
doThrow(new TimeoutException("Internal Server error for RAH-X"))
|
||||
.doThrow(new TimeoutException("Internal Server error for RAH-Y"))
|
||||
.doThrow(new TimeoutException("Internal Server error for RAH-Z"))
|
||||
.doReturn(successOp) // pass the read for second read request
|
||||
.doReturn(successOp) // pass success for post eviction test
|
||||
.when(client)
|
||||
.read(any(String.class), any(Long.class), any(byte[].class),
|
||||
any(Integer.class), any(Integer.class), any(String.class),
|
||||
any(String.class));
|
||||
|
||||
AbfsInputStream inputStream = getAbfsInputStream(client, "testOlderReadAheadFailure.txt");
|
||||
|
||||
// First read request that fails as the readahead triggered from this request failed.
|
||||
intercept(IOException.class,
|
||||
() -> inputStream.read(new byte[ONE_KB]));
|
||||
|
||||
// Only the 3 readAhead threads should have triggered client.read
|
||||
verifyReadCallCount(client, 3);
|
||||
|
||||
// Sleep for thresholdAgeMs so that the read ahead buffer qualifies for being old.
|
||||
Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds());
|
||||
|
||||
// Second read request should retry the read (and not issue any new readaheads)
|
||||
inputStream.read(ONE_KB, new byte[ONE_KB], 0, ONE_KB);
|
||||
|
||||
// Once created, mock will remember all interactions. So total number of read
|
||||
// calls will be one more from earlier (there is a reset mock which will reset the
|
||||
// count, but the mock stub is erased as well which needs AbsInputStream to be recreated,
|
||||
// which beats the purpose)
|
||||
verifyReadCallCount(client, 4);
|
||||
|
||||
// Stub returns success for the 5th read request, if ReadBuffers still
|
||||
// persisted request would have failed for position 0.
|
||||
checkEvictedStatus(inputStream, 0, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* The test expects AbfsInputStream to utilize any data read ahead for
|
||||
* requested offset and length.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testSuccessfulReadAhead() throws Exception {
|
||||
// Mock failure for client.read()
|
||||
AbfsClient client = getMockAbfsClient();
|
||||
|
||||
// Success operation mock
|
||||
AbfsRestOperation op = getMockRestOp();
|
||||
|
||||
// Stub :
|
||||
// Pass all readAheads and fail the post eviction request to
|
||||
// prove ReadAhead buffer is used
|
||||
// for post eviction check, fail all read aheads
|
||||
doReturn(op)
|
||||
.doReturn(op)
|
||||
.doReturn(op)
|
||||
.doThrow(new TimeoutException("Internal Server error for RAH-X"))
|
||||
.doThrow(new TimeoutException("Internal Server error for RAH-Y"))
|
||||
.doThrow(new TimeoutException("Internal Server error for RAH-Z"))
|
||||
.when(client)
|
||||
.read(any(String.class), any(Long.class), any(byte[].class),
|
||||
any(Integer.class), any(Integer.class), any(String.class),
|
||||
any(String.class));
|
||||
|
||||
AbfsInputStream inputStream = getAbfsInputStream(client, "testSuccessfulReadAhead.txt");
|
||||
|
||||
// First read request that triggers readAheads.
|
||||
inputStream.read(new byte[ONE_KB]);
|
||||
|
||||
// Only the 3 readAhead threads should have triggered client.read
|
||||
verifyReadCallCount(client, 3);
|
||||
|
||||
// Another read request whose requested data is already read ahead.
|
||||
inputStream.read(ONE_KB, new byte[ONE_KB], 0, ONE_KB);
|
||||
|
||||
// Once created, mock will remember all interactions.
|
||||
// As the above read should not have triggered any server calls, total
|
||||
// number of read calls made at this point will be same as last.
|
||||
verifyReadCallCount(client, 3);
|
||||
|
||||
// Stub will throw exception for client.read() for 4th and later calls
|
||||
// if not using the read-ahead buffer exception will be thrown on read
|
||||
checkEvictedStatus(inputStream, 0, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test expects ReadAheadManager to throw exception if the read ahead
|
||||
* thread had failed within the last thresholdAgeMilliseconds.
|
||||
* Also checks that the ReadBuffers are evicted as per the ReadBufferManager
|
||||
* threshold criteria.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testReadAheadManagerForFailedReadAhead() throws Exception {
|
||||
AbfsClient client = getMockAbfsClient();
|
||||
AbfsRestOperation successOp = getMockRestOp();
|
||||
|
||||
// Stub :
|
||||
// Read request leads to 3 readahead calls: Fail all 3 readahead-client.read()
|
||||
// Actual read request fails with the failure in readahead thread
|
||||
doThrow(new TimeoutException("Internal Server error for RAH-Thread-X"))
|
||||
.doThrow(new TimeoutException("Internal Server error for RAH-Thread-Y"))
|
||||
.doThrow(new TimeoutException("Internal Server error RAH-Thread-Z"))
|
||||
.doReturn(successOp) // Any extra calls to read, pass it.
|
||||
.when(client)
|
||||
.read(any(String.class), any(Long.class), any(byte[].class),
|
||||
any(Integer.class), any(Integer.class), any(String.class),
|
||||
any(String.class));
|
||||
|
||||
AbfsInputStream inputStream = getAbfsInputStream(client, "testReadAheadManagerForFailedReadAhead.txt");
|
||||
|
||||
queueReadAheads(inputStream);
|
||||
|
||||
// AbfsInputStream Read would have waited for the read-ahead for the requested offset
|
||||
// as we are testing from ReadAheadManager directly, sleep for a sec to
|
||||
// get the read ahead threads to complete
|
||||
Thread.sleep(1000);
|
||||
|
||||
// if readAhead failed for specific offset, getBlock should
|
||||
// throw exception from the ReadBuffer that failed within last thresholdAgeMilliseconds sec
|
||||
intercept(IOException.class,
|
||||
() -> ReadBufferManager.getBufferManager().getBlock(
|
||||
inputStream,
|
||||
0,
|
||||
ONE_KB,
|
||||
new byte[ONE_KB]));
|
||||
|
||||
// Only the 3 readAhead threads should have triggered client.read
|
||||
verifyReadCallCount(client, 3);
|
||||
|
||||
// Stub returns success for the 4th read request, if ReadBuffers still
|
||||
// persisted, ReadAheadManager getBlock would have returned exception.
|
||||
checkEvictedStatus(inputStream, 0, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* The test expects ReadAheadManager to return 0 receivedBytes when previous
|
||||
* read ahead on the offset had failed and not throw exception received then.
|
||||
* Also checks that the ReadBuffers are evicted as per the ReadBufferManager
|
||||
* threshold criteria.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testReadAheadManagerForOlderReadAheadFailure() throws Exception {
|
||||
AbfsClient client = getMockAbfsClient();
|
||||
AbfsRestOperation successOp = getMockRestOp();
|
||||
|
||||
// Stub :
|
||||
// First Read request leads to 3 readahead calls: Fail all 3 readahead-client.read()
|
||||
// A second read request will see that readahead had failed for data in
|
||||
// the requested offset range but also that its is an older readahead request.
|
||||
// System issue could have resolved by now, so attempt a new read only for the requested range.
|
||||
doThrow(new TimeoutException("Internal Server error for RAH-X"))
|
||||
.doThrow(new TimeoutException("Internal Server error for RAH-X"))
|
||||
.doThrow(new TimeoutException("Internal Server error for RAH-X"))
|
||||
.doReturn(successOp) // pass the read for second read request
|
||||
.doReturn(successOp) // pass success for post eviction test
|
||||
.when(client)
|
||||
.read(any(String.class), any(Long.class), any(byte[].class),
|
||||
any(Integer.class), any(Integer.class), any(String.class),
|
||||
any(String.class));
|
||||
|
||||
AbfsInputStream inputStream = getAbfsInputStream(client, "testReadAheadManagerForOlderReadAheadFailure.txt");
|
||||
|
||||
queueReadAheads(inputStream);
|
||||
|
||||
// AbfsInputStream Read would have waited for the read-ahead for the requested offset
|
||||
// as we are testing from ReadAheadManager directly, sleep for thresholdAgeMilliseconds so that
|
||||
// read buffer qualifies for to be an old buffer
|
||||
Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds());
|
||||
|
||||
// Only the 3 readAhead threads should have triggered client.read
|
||||
verifyReadCallCount(client, 3);
|
||||
|
||||
// getBlock from a new read request should return 0 if there is a failure
|
||||
// 30 sec before in read ahead buffer for respective offset.
|
||||
int bytesRead = ReadBufferManager.getBufferManager().getBlock(
|
||||
inputStream,
|
||||
ONE_KB,
|
||||
ONE_KB,
|
||||
new byte[ONE_KB]);
|
||||
Assert.assertEquals("bytesRead should be zero when previously read "
|
||||
+ "ahead buffer had failed", 0, bytesRead);
|
||||
|
||||
// Stub returns success for the 5th read request, if ReadBuffers still
|
||||
// persisted request would have failed for position 0.
|
||||
checkEvictedStatus(inputStream, 0, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* The test expects ReadAheadManager to return data from previously read
|
||||
* ahead data of same offset.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testReadAheadManagerForSuccessfulReadAhead() throws Exception {
|
||||
// Mock failure for client.read()
|
||||
AbfsClient client = getMockAbfsClient();
|
||||
|
||||
// Success operation mock
|
||||
AbfsRestOperation op = getMockRestOp();
|
||||
|
||||
// Stub :
|
||||
// Pass all readAheads and fail the post eviction request to
|
||||
// prove ReadAhead buffer is used
|
||||
doReturn(op)
|
||||
.doReturn(op)
|
||||
.doReturn(op)
|
||||
.doThrow(new TimeoutException("Internal Server error for RAH-X")) // for post eviction request
|
||||
.doThrow(new TimeoutException("Internal Server error for RAH-Y"))
|
||||
.doThrow(new TimeoutException("Internal Server error for RAH-Z"))
|
||||
.when(client)
|
||||
.read(any(String.class), any(Long.class), any(byte[].class),
|
||||
any(Integer.class), any(Integer.class), any(String.class),
|
||||
any(String.class));
|
||||
|
||||
AbfsInputStream inputStream = getAbfsInputStream(client, "testSuccessfulReadAhead.txt");
|
||||
|
||||
queueReadAheads(inputStream);
|
||||
|
||||
// AbfsInputStream Read would have waited for the read-ahead for the requested offset
|
||||
// as we are testing from ReadAheadManager directly, sleep for a sec to
|
||||
// get the read ahead threads to complete
|
||||
Thread.sleep(1000);
|
||||
|
||||
// Only the 3 readAhead threads should have triggered client.read
|
||||
verifyReadCallCount(client, 3);
|
||||
|
||||
// getBlock for a new read should return the buffer read-ahead
|
||||
int bytesRead = ReadBufferManager.getBufferManager().getBlock(
|
||||
inputStream,
|
||||
ONE_KB,
|
||||
ONE_KB,
|
||||
new byte[ONE_KB]);
|
||||
|
||||
Assert.assertTrue("bytesRead should be non-zero from the "
|
||||
+ "buffer that was read-ahead", bytesRead > 0);
|
||||
|
||||
// Once created, mock will remember all interactions.
|
||||
// As the above read should not have triggered any server calls, total
|
||||
// number of read calls made at this point will be same as last.
|
||||
verifyReadCallCount(client, 3);
|
||||
|
||||
// Stub will throw exception for client.read() for 4th and later calls
|
||||
// if not using the read-ahead buffer exception will be thrown on read
|
||||
checkEvictedStatus(inputStream, 0, true);
|
||||
}
|
||||
|
||||
}
|
|
@ -22,12 +22,14 @@ import java.io.IOException;
|
|||
import java.time.OffsetDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS;
|
||||
import static java.time.temporal.ChronoUnit.SECONDS;
|
||||
import static java.time.temporal.ChronoUnit.DAYS;
|
||||
|
||||
/**
|
||||
* Test CachedSASToken.
|
||||
|
@ -159,4 +161,36 @@ public final class TestCachedSASToken {
|
|||
cachedToken = cachedSasToken.get();
|
||||
Assert.assertNull(cachedToken);
|
||||
}
|
||||
|
||||
public static CachedSASToken getTestCachedSASTokenInstance() {
|
||||
String expiryPostADay = OffsetDateTime.now(ZoneOffset.UTC)
|
||||
.plus(1, DAYS)
|
||||
.format(DateTimeFormatter.ISO_DATE_TIME);
|
||||
String version = "2020-20-20";
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("skoid=");
|
||||
sb.append(UUID.randomUUID().toString());
|
||||
sb.append("&sktid=");
|
||||
sb.append(UUID.randomUUID().toString());
|
||||
sb.append("&skt=");
|
||||
sb.append(OffsetDateTime.now(ZoneOffset.UTC)
|
||||
.minus(1, DAYS)
|
||||
.format(DateTimeFormatter.ISO_DATE_TIME));
|
||||
sb.append("&ske=");
|
||||
sb.append(expiryPostADay);
|
||||
sb.append("&sks=b");
|
||||
sb.append("&skv=");
|
||||
sb.append(version);
|
||||
sb.append("&sp=rw");
|
||||
sb.append("&sr=b");
|
||||
sb.append("&se=");
|
||||
sb.append(expiryPostADay);
|
||||
sb.append("&sv=2");
|
||||
sb.append(version);
|
||||
|
||||
CachedSASToken cachedSASToken = new CachedSASToken();
|
||||
cachedSASToken.update(sb.toString());
|
||||
return cachedSASToken;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue