From 53b993e6048ffaaf98e460690211fc08efb20cf2 Mon Sep 17 00:00:00 2001 From: Sneha Vijayarajan Date: Wed, 27 May 2020 13:51:42 -0700 Subject: [PATCH] HADOOP-16852: Report read-ahead error back Contributed by Sneha Vijayarajan --- .../fs/azurebfs/services/AbfsInputStream.java | 12 + .../fs/azurebfs/services/ReadBuffer.java | 16 + .../azurebfs/services/ReadBufferManager.java | 90 +++- .../azurebfs/services/ReadBufferWorker.java | 12 +- .../services/TestAbfsInputStream.java | 450 ++++++++++++++++++ .../fs/azurebfs/utils/TestCachedSASToken.java | 34 ++ 6 files changed, 604 insertions(+), 10 deletions(-) create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 422fa3b2f70..50380c9bb9f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -24,6 +24,10 @@ import java.io.IOException; import java.net.HttpURLConnection; import com.google.common.base.Preconditions; +import com.google.common.annotations.VisibleForTesting; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.CanUnbuffer; import org.apache.hadoop.fs.FSExceptionMessages; @@ -41,6 +45,7 @@ import static org.apache.hadoop.util.StringUtils.toLowerCase; */ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, StreamCapabilities { + private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class); private final AbfsClient client; private final Statistics statistics; @@ -239,6 +244,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, final AbfsRestOperation op; AbfsPerfTracker tracker = client.getAbfsPerfTracker(); try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) { + LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length); op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get()); cachedSasToken.update(op.getSasToken()); perfInfo.registerResult(op.getResult()).registerSuccess(true); @@ -431,4 +437,10 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, byte[] getBuffer() { return buffer; } + + @VisibleForTesting + protected void setCachedSasToken(final CachedSASToken cachedSasToken) { + this.cachedSasToken = cachedSasToken; + } + } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java index 00e4f008ad0..5d55726222d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java @@ -18,10 +18,13 @@ package org.apache.hadoop.fs.azurebfs.services; +import java.io.IOException; import java.util.concurrent.CountDownLatch; import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; +import static org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus.READ_FAILED; + class ReadBuffer { private AbfsInputStream stream; @@ -40,6 +43,8 @@ class ReadBuffer { private boolean isLastByteConsumed = false; private boolean isAnyByteConsumed = false; + private IOException errException = null; + public AbfsInputStream getStream() { return stream; } @@ -88,12 +93,23 @@ class ReadBuffer { this.bufferindex = bufferindex; } + public IOException getErrException() { + return errException; + } + + public void setErrException(final IOException errException) { + this.errException = errException; + } + public ReadBufferStatus getStatus() { return status; } public void setStatus(ReadBufferStatus status) { this.status = status; + if (status == READ_FAILED) { + bufferindex = -1; + } } public CountDownLatch getLatch() { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java index 5b71cf05225..73c23b01551 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java @@ -21,12 +21,15 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.Collection; import java.util.LinkedList; import java.util.Queue; import java.util.Stack; import java.util.concurrent.CountDownLatch; +import com.google.common.annotations.VisibleForTesting; + /** * The Read Buffer Manager for Rest AbfsClient. */ @@ -36,8 +39,9 @@ final class ReadBufferManager { private static final int NUM_BUFFERS = 16; private static final int BLOCK_SIZE = 4 * 1024 * 1024; private static final int NUM_THREADS = 8; - private static final int THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold + private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold + private static int thresholdAgeMilliseconds = DEFAULT_THRESHOLD_AGE_MILLISECONDS; private Thread[] threads = new Thread[NUM_THREADS]; private byte[][] buffers; // array of byte[] buffers, to hold the data that is read private Stack freeList = new Stack<>(); // indices in buffers[] array that are available @@ -141,7 +145,8 @@ final class ReadBufferManager { * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0. * @return the number of bytes read */ - int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer) { + int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer) + throws IOException { // not synchronized, so have to be careful with locking if (LOGGER.isTraceEnabled()) { LOGGER.trace("getBlock for file {} position {} thread {}", @@ -244,7 +249,7 @@ final class ReadBufferManager { earliestBirthday = buf.getTimeStamp(); } } - if ((currentTimeMillis() - earliestBirthday > THRESHOLD_AGE_MILLISECONDS) && (nodeToEvict != null)) { + if ((currentTimeMillis() - earliestBirthday > thresholdAgeMilliseconds) && (nodeToEvict != null)) { return evict(nodeToEvict); } @@ -253,7 +258,12 @@ final class ReadBufferManager { } private boolean evict(final ReadBuffer buf) { - freeList.push(buf.getBufferindex()); + // As failed ReadBuffers (bufferIndx = -1) are saved in completedReadList, + // avoid adding it to freeList. + if (buf.getBufferindex() != -1) { + freeList.push(buf.getBufferindex()); + } + completedReadList.remove(buf); if (LOGGER.isTraceEnabled()) { LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}", @@ -289,6 +299,27 @@ final class ReadBufferManager { return null; } + /** + * Returns buffers that failed or passed from completed queue. + * @param stream + * @param requestedOffset + * @return + */ + private ReadBuffer getBufferFromCompletedQueue(final AbfsInputStream stream, final long requestedOffset) { + for (ReadBuffer buffer : completedReadList) { + // Buffer is returned if the requestedOffset is at or above buffer's + // offset but less than buffer's length or the actual requestedLength + if ((buffer.getStream() == stream) + && (requestedOffset >= buffer.getOffset()) + && ((requestedOffset < buffer.getOffset() + buffer.getLength()) + || (requestedOffset < buffer.getOffset() + buffer.getRequestedLength()))) { + return buffer; + } + } + + return null; + } + private void clearFromReadAheadQueue(final AbfsInputStream stream, final long requestedOffset) { ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset); if (buffer != null) { @@ -299,11 +330,28 @@ final class ReadBufferManager { } private int getBlockFromCompletedQueue(final AbfsInputStream stream, final long position, final int length, - final byte[] buffer) { - ReadBuffer buf = getFromList(completedReadList, stream, position); - if (buf == null || position >= buf.getOffset() + buf.getLength()) { + final byte[] buffer) throws IOException { + ReadBuffer buf = getBufferFromCompletedQueue(stream, position); + + if (buf == null) { return 0; } + + if (buf.getStatus() == ReadBufferStatus.READ_FAILED) { + // To prevent new read requests to fail due to old read-ahead attempts, + // return exception only from buffers that failed within last thresholdAgeMilliseconds + if ((currentTimeMillis() - (buf.getTimeStamp()) < thresholdAgeMilliseconds)) { + throw buf.getErrException(); + } else { + return 0; + } + } + + if ((buf.getStatus() != ReadBufferStatus.AVAILABLE) + || (position >= buf.getOffset() + buf.getLength())) { + return 0; + } + int cursor = (int) (position - buf.getOffset()); int availableLengthInBuffer = buf.getLength() - cursor; int lengthToCopy = Math.min(length, availableLengthInBuffer); @@ -368,14 +416,18 @@ final class ReadBufferManager { inProgressList.remove(buffer); if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { buffer.setStatus(ReadBufferStatus.AVAILABLE); - buffer.setTimeStamp(currentTimeMillis()); buffer.setLength(bytesActuallyRead); completedReadList.add(buffer); } else { freeList.push(buffer.getBufferindex()); - // buffer should go out of scope after the end of the calling method in ReadBufferWorker, and eligible for GC + // buffer will be deleted as per the eviction policy. } + + buffer.setStatus(result); + buffer.setTimeStamp(currentTimeMillis()); + completedReadList.add(buffer); } + //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results buffer.getLatch().countDown(); // wake up waiting threads (if any) } @@ -392,4 +444,24 @@ final class ReadBufferManager { private long currentTimeMillis() { return System.nanoTime() / 1000 / 1000; } + + @VisibleForTesting + int getThresholdAgeMilliseconds() { + return thresholdAgeMilliseconds; + } + + @VisibleForTesting + static void setThresholdAgeMilliseconds(int thresholdAgeMs) { + thresholdAgeMilliseconds = thresholdAgeMs; + } + + @VisibleForTesting + int getCompletedReadListSize() { + return completedReadList.size(); + } + + @VisibleForTesting + void callTryEvict() { + tryEvict(); + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java index af69de0f089..41acd7e06f1 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.azurebfs.services; +import java.io.IOException; import java.util.concurrent.CountDownLatch; import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; @@ -61,9 +62,18 @@ class ReadBufferWorker implements Runnable { if (buffer != null) { try { // do the actual read, from the file. - int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength()); + int bytesRead = buffer.getStream().readRemote( + buffer.getOffset(), + buffer.getBuffer(), + 0, + // If AbfsInputStream was created with bigger buffer size than + // read-ahead buffer size, make sure a valid length is passed + // for remote read + Math.min(buffer.getRequestedLength(), buffer.getBuffer().length)); + bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager } catch (Exception ex) { + buffer.setErrException(new IOException(ex)); bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java new file mode 100644 index 00000000000..c9dacd6eb06 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -0,0 +1,450 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException; +import org.apache.hadoop.fs.azurebfs.utils.TestCachedSASToken; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH; + +/** + * Unit test AbfsInputStream. + */ +public class TestAbfsInputStream extends + AbstractAbfsIntegrationTest { + + private static final int ONE_KB = 1 * 1024; + private static final int TWO_KB = 2 * 1024; + private static final int THREE_KB = 3 * 1024; + private static final int REDUCED_READ_BUFFER_AGE_THRESHOLD = 3000; // 3 sec + + private AbfsRestOperation getMockRestOp() { + AbfsRestOperation op = mock(AbfsRestOperation.class); + AbfsHttpOperation httpOp = mock(AbfsHttpOperation.class); + when(httpOp.getBytesReceived()).thenReturn(1024L); + when(op.getResult()).thenReturn(httpOp); + when(op.getSasToken()).thenReturn(TestCachedSASToken.getTestCachedSASTokenInstance().get()); + return op; + } + + private AbfsClient getMockAbfsClient() { + // Mock failure for client.read() + AbfsClient client = mock(AbfsClient.class); + AbfsPerfTracker tracker = new AbfsPerfTracker( + "test", + this.getAccountName(), + this.getConfiguration()); + when(client.getAbfsPerfTracker()).thenReturn(tracker); + + return client; + } + + private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient, String fileName) { + AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1); + // Create AbfsInputStream with the client instance + AbfsInputStream inputStream = new AbfsInputStream( + mockAbfsClient, + null, + FORWARD_SLASH + fileName, + THREE_KB, + inputStreamContext.withReadBufferSize(ONE_KB).withReadAheadQueueDepth(10), + "eTag"); + + inputStream.setCachedSasToken( + TestCachedSASToken.getTestCachedSASTokenInstance()); + + return inputStream; + } + + private void queueReadAheads(AbfsInputStream inputStream) { + // Mimic AbfsInputStream readAhead queue requests + ReadBufferManager.getBufferManager() + .queueReadAhead(inputStream, 0, ONE_KB); + ReadBufferManager.getBufferManager() + .queueReadAhead(inputStream, ONE_KB, ONE_KB); + ReadBufferManager.getBufferManager() + .queueReadAhead(inputStream, TWO_KB, TWO_KB); + } + + private void verifyReadCallCount(AbfsClient client, int count) throws + AzureBlobFileSystemException, InterruptedException { + // ReadAhead threads are triggered asynchronously. + // Wait a second before verifying the number of total calls. + Thread.sleep(1000); + verify(client, times(count)).read(any(String.class), any(Long.class), + any(byte[].class), any(Integer.class), any(Integer.class), + any(String.class), any(String.class)); + } + + private void checkEvictedStatus(AbfsInputStream inputStream, int position, boolean expectedToThrowException) + throws Exception { + // Sleep for the eviction threshold time + Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds() + 1000); + + // Eviction is done only when AbfsInputStream tries to queue new items. + // 1 tryEvict will remove 1 eligible item. To ensure that the current test buffer + // will get evicted (considering there could be other tests running in parallel), + // call tryEvict for the number of items that are there in completedReadList. + int numOfCompletedReadListItems = ReadBufferManager.getBufferManager().getCompletedReadListSize(); + while (numOfCompletedReadListItems > 0) { + ReadBufferManager.getBufferManager().callTryEvict(); + numOfCompletedReadListItems--; + } + + if (expectedToThrowException) { + intercept(IOException.class, + () -> inputStream.read(position, new byte[ONE_KB], 0, ONE_KB)); + } else { + inputStream.read(position, new byte[ONE_KB], 0, ONE_KB); + } + } + + public TestAbfsInputStream() throws Exception { + super(); + // Reduce thresholdAgeMilliseconds to 3 sec for the tests + ReadBufferManager.getBufferManager().setThresholdAgeMilliseconds(REDUCED_READ_BUFFER_AGE_THRESHOLD); + } + + /** + * This test expects AbfsInputStream to throw the exception that readAhead + * thread received on read. The readAhead thread must be initiated from the + * active read request itself. + * Also checks that the ReadBuffers are evicted as per the ReadBufferManager + * threshold criteria. + * @throws Exception + */ + @Test + public void testFailedReadAhead() throws Exception { + AbfsClient client = getMockAbfsClient(); + AbfsRestOperation successOp = getMockRestOp(); + + // Stub : + // Read request leads to 3 readahead calls: Fail all 3 readahead-client.read() + // Actual read request fails with the failure in readahead thread + doThrow(new TimeoutException("Internal Server error for RAH-Thread-X")) + .doThrow(new TimeoutException("Internal Server error for RAH-Thread-Y")) + .doThrow(new TimeoutException("Internal Server error RAH-Thread-Z")) + .doReturn(successOp) // Any extra calls to read, pass it. + .when(client) + .read(any(String.class), any(Long.class), any(byte[].class), + any(Integer.class), any(Integer.class), any(String.class), + any(String.class)); + + AbfsInputStream inputStream = getAbfsInputStream(client, "testFailedReadAhead.txt"); + + // Scenario: ReadAhead triggered from current active read call failed + // Before the change to return exception from readahead buffer, + // AbfsInputStream would have triggered an extra readremote on noticing + // data absent in readahead buffers + // In this test, a read should trigger 3 client.read() calls as file is 3 KB + // and readahead buffer size set in AbfsInputStream is 1 KB + // There should only be a total of 3 client.read() in this test. + intercept(IOException.class, + () -> inputStream.read(new byte[ONE_KB])); + + // Only the 3 readAhead threads should have triggered client.read + verifyReadCallCount(client, 3); + + // Stub returns success for the 4th read request, if ReadBuffers still + // persisted, ReadAheadManager getBlock would have returned exception. + checkEvictedStatus(inputStream, 0, false); + } + + /** + * The test expects AbfsInputStream to initiate a remote read request for + * the request offset and length when previous read ahead on the offset had failed. + * Also checks that the ReadBuffers are evicted as per the ReadBufferManager + * threshold criteria. + * @throws Exception + */ + @Test + public void testOlderReadAheadFailure() throws Exception { + AbfsClient client = getMockAbfsClient(); + AbfsRestOperation successOp = getMockRestOp(); + + // Stub : + // First Read request leads to 3 readahead calls: Fail all 3 readahead-client.read() + // A second read request will see that readahead had failed for data in + // the requested offset range and also that its is an older readahead request. + // So attempt a new read only for the requested range. + doThrow(new TimeoutException("Internal Server error for RAH-X")) + .doThrow(new TimeoutException("Internal Server error for RAH-Y")) + .doThrow(new TimeoutException("Internal Server error for RAH-Z")) + .doReturn(successOp) // pass the read for second read request + .doReturn(successOp) // pass success for post eviction test + .when(client) + .read(any(String.class), any(Long.class), any(byte[].class), + any(Integer.class), any(Integer.class), any(String.class), + any(String.class)); + + AbfsInputStream inputStream = getAbfsInputStream(client, "testOlderReadAheadFailure.txt"); + + // First read request that fails as the readahead triggered from this request failed. + intercept(IOException.class, + () -> inputStream.read(new byte[ONE_KB])); + + // Only the 3 readAhead threads should have triggered client.read + verifyReadCallCount(client, 3); + + // Sleep for thresholdAgeMs so that the read ahead buffer qualifies for being old. + Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds()); + + // Second read request should retry the read (and not issue any new readaheads) + inputStream.read(ONE_KB, new byte[ONE_KB], 0, ONE_KB); + + // Once created, mock will remember all interactions. So total number of read + // calls will be one more from earlier (there is a reset mock which will reset the + // count, but the mock stub is erased as well which needs AbsInputStream to be recreated, + // which beats the purpose) + verifyReadCallCount(client, 4); + + // Stub returns success for the 5th read request, if ReadBuffers still + // persisted request would have failed for position 0. + checkEvictedStatus(inputStream, 0, false); + } + + /** + * The test expects AbfsInputStream to utilize any data read ahead for + * requested offset and length. + * @throws Exception + */ + @Test + public void testSuccessfulReadAhead() throws Exception { + // Mock failure for client.read() + AbfsClient client = getMockAbfsClient(); + + // Success operation mock + AbfsRestOperation op = getMockRestOp(); + + // Stub : + // Pass all readAheads and fail the post eviction request to + // prove ReadAhead buffer is used + // for post eviction check, fail all read aheads + doReturn(op) + .doReturn(op) + .doReturn(op) + .doThrow(new TimeoutException("Internal Server error for RAH-X")) + .doThrow(new TimeoutException("Internal Server error for RAH-Y")) + .doThrow(new TimeoutException("Internal Server error for RAH-Z")) + .when(client) + .read(any(String.class), any(Long.class), any(byte[].class), + any(Integer.class), any(Integer.class), any(String.class), + any(String.class)); + + AbfsInputStream inputStream = getAbfsInputStream(client, "testSuccessfulReadAhead.txt"); + + // First read request that triggers readAheads. + inputStream.read(new byte[ONE_KB]); + + // Only the 3 readAhead threads should have triggered client.read + verifyReadCallCount(client, 3); + + // Another read request whose requested data is already read ahead. + inputStream.read(ONE_KB, new byte[ONE_KB], 0, ONE_KB); + + // Once created, mock will remember all interactions. + // As the above read should not have triggered any server calls, total + // number of read calls made at this point will be same as last. + verifyReadCallCount(client, 3); + + // Stub will throw exception for client.read() for 4th and later calls + // if not using the read-ahead buffer exception will be thrown on read + checkEvictedStatus(inputStream, 0, true); + } + + /** + * This test expects ReadAheadManager to throw exception if the read ahead + * thread had failed within the last thresholdAgeMilliseconds. + * Also checks that the ReadBuffers are evicted as per the ReadBufferManager + * threshold criteria. + * @throws Exception + */ + @Test + public void testReadAheadManagerForFailedReadAhead() throws Exception { + AbfsClient client = getMockAbfsClient(); + AbfsRestOperation successOp = getMockRestOp(); + + // Stub : + // Read request leads to 3 readahead calls: Fail all 3 readahead-client.read() + // Actual read request fails with the failure in readahead thread + doThrow(new TimeoutException("Internal Server error for RAH-Thread-X")) + .doThrow(new TimeoutException("Internal Server error for RAH-Thread-Y")) + .doThrow(new TimeoutException("Internal Server error RAH-Thread-Z")) + .doReturn(successOp) // Any extra calls to read, pass it. + .when(client) + .read(any(String.class), any(Long.class), any(byte[].class), + any(Integer.class), any(Integer.class), any(String.class), + any(String.class)); + + AbfsInputStream inputStream = getAbfsInputStream(client, "testReadAheadManagerForFailedReadAhead.txt"); + + queueReadAheads(inputStream); + + // AbfsInputStream Read would have waited for the read-ahead for the requested offset + // as we are testing from ReadAheadManager directly, sleep for a sec to + // get the read ahead threads to complete + Thread.sleep(1000); + + // if readAhead failed for specific offset, getBlock should + // throw exception from the ReadBuffer that failed within last thresholdAgeMilliseconds sec + intercept(IOException.class, + () -> ReadBufferManager.getBufferManager().getBlock( + inputStream, + 0, + ONE_KB, + new byte[ONE_KB])); + + // Only the 3 readAhead threads should have triggered client.read + verifyReadCallCount(client, 3); + + // Stub returns success for the 4th read request, if ReadBuffers still + // persisted, ReadAheadManager getBlock would have returned exception. + checkEvictedStatus(inputStream, 0, false); + } + + /** + * The test expects ReadAheadManager to return 0 receivedBytes when previous + * read ahead on the offset had failed and not throw exception received then. + * Also checks that the ReadBuffers are evicted as per the ReadBufferManager + * threshold criteria. + * @throws Exception + */ + @Test + public void testReadAheadManagerForOlderReadAheadFailure() throws Exception { + AbfsClient client = getMockAbfsClient(); + AbfsRestOperation successOp = getMockRestOp(); + + // Stub : + // First Read request leads to 3 readahead calls: Fail all 3 readahead-client.read() + // A second read request will see that readahead had failed for data in + // the requested offset range but also that its is an older readahead request. + // System issue could have resolved by now, so attempt a new read only for the requested range. + doThrow(new TimeoutException("Internal Server error for RAH-X")) + .doThrow(new TimeoutException("Internal Server error for RAH-X")) + .doThrow(new TimeoutException("Internal Server error for RAH-X")) + .doReturn(successOp) // pass the read for second read request + .doReturn(successOp) // pass success for post eviction test + .when(client) + .read(any(String.class), any(Long.class), any(byte[].class), + any(Integer.class), any(Integer.class), any(String.class), + any(String.class)); + + AbfsInputStream inputStream = getAbfsInputStream(client, "testReadAheadManagerForOlderReadAheadFailure.txt"); + + queueReadAheads(inputStream); + + // AbfsInputStream Read would have waited for the read-ahead for the requested offset + // as we are testing from ReadAheadManager directly, sleep for thresholdAgeMilliseconds so that + // read buffer qualifies for to be an old buffer + Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds()); + + // Only the 3 readAhead threads should have triggered client.read + verifyReadCallCount(client, 3); + + // getBlock from a new read request should return 0 if there is a failure + // 30 sec before in read ahead buffer for respective offset. + int bytesRead = ReadBufferManager.getBufferManager().getBlock( + inputStream, + ONE_KB, + ONE_KB, + new byte[ONE_KB]); + Assert.assertEquals("bytesRead should be zero when previously read " + + "ahead buffer had failed", 0, bytesRead); + + // Stub returns success for the 5th read request, if ReadBuffers still + // persisted request would have failed for position 0. + checkEvictedStatus(inputStream, 0, false); + } + + /** + * The test expects ReadAheadManager to return data from previously read + * ahead data of same offset. + * @throws Exception + */ + @Test + public void testReadAheadManagerForSuccessfulReadAhead() throws Exception { + // Mock failure for client.read() + AbfsClient client = getMockAbfsClient(); + + // Success operation mock + AbfsRestOperation op = getMockRestOp(); + + // Stub : + // Pass all readAheads and fail the post eviction request to + // prove ReadAhead buffer is used + doReturn(op) + .doReturn(op) + .doReturn(op) + .doThrow(new TimeoutException("Internal Server error for RAH-X")) // for post eviction request + .doThrow(new TimeoutException("Internal Server error for RAH-Y")) + .doThrow(new TimeoutException("Internal Server error for RAH-Z")) + .when(client) + .read(any(String.class), any(Long.class), any(byte[].class), + any(Integer.class), any(Integer.class), any(String.class), + any(String.class)); + + AbfsInputStream inputStream = getAbfsInputStream(client, "testSuccessfulReadAhead.txt"); + + queueReadAheads(inputStream); + + // AbfsInputStream Read would have waited for the read-ahead for the requested offset + // as we are testing from ReadAheadManager directly, sleep for a sec to + // get the read ahead threads to complete + Thread.sleep(1000); + + // Only the 3 readAhead threads should have triggered client.read + verifyReadCallCount(client, 3); + + // getBlock for a new read should return the buffer read-ahead + int bytesRead = ReadBufferManager.getBufferManager().getBlock( + inputStream, + ONE_KB, + ONE_KB, + new byte[ONE_KB]); + + Assert.assertTrue("bytesRead should be non-zero from the " + + "buffer that was read-ahead", bytesRead > 0); + + // Once created, mock will remember all interactions. + // As the above read should not have triggered any server calls, total + // number of read calls made at this point will be same as last. + verifyReadCallCount(client, 3); + + // Stub will throw exception for client.read() for 4th and later calls + // if not using the read-ahead buffer exception will be thrown on read + checkEvictedStatus(inputStream, 0, true); + } + +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TestCachedSASToken.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TestCachedSASToken.java index 1016d4bbbb4..cbba8087720 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TestCachedSASToken.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TestCachedSASToken.java @@ -22,12 +22,14 @@ import java.io.IOException; import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; +import java.util.UUID; import org.junit.Assert; import org.junit.Test; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS; import static java.time.temporal.ChronoUnit.SECONDS; +import static java.time.temporal.ChronoUnit.DAYS; /** * Test CachedSASToken. @@ -159,4 +161,36 @@ public final class TestCachedSASToken { cachedToken = cachedSasToken.get(); Assert.assertNull(cachedToken); } + + public static CachedSASToken getTestCachedSASTokenInstance() { + String expiryPostADay = OffsetDateTime.now(ZoneOffset.UTC) + .plus(1, DAYS) + .format(DateTimeFormatter.ISO_DATE_TIME); + String version = "2020-20-20"; + + StringBuilder sb = new StringBuilder(); + sb.append("skoid="); + sb.append(UUID.randomUUID().toString()); + sb.append("&sktid="); + sb.append(UUID.randomUUID().toString()); + sb.append("&skt="); + sb.append(OffsetDateTime.now(ZoneOffset.UTC) + .minus(1, DAYS) + .format(DateTimeFormatter.ISO_DATE_TIME)); + sb.append("&ske="); + sb.append(expiryPostADay); + sb.append("&sks=b"); + sb.append("&skv="); + sb.append(version); + sb.append("&sp=rw"); + sb.append("&sr=b"); + sb.append("&se="); + sb.append(expiryPostADay); + sb.append("&sv=2"); + sb.append(version); + + CachedSASToken cachedSASToken = new CachedSASToken(); + cachedSASToken.update(sb.toString()); + return cachedSASToken; + } } \ No newline at end of file