From cb6729224e15b89bd7fa7877fe045d28b3582f7b Mon Sep 17 00:00:00 2001 From: bilaharith <52483117+bilaharith@users.noreply.github.com> Date: Sun, 3 Jan 2021 00:07:10 +0530 Subject: [PATCH] HADOOP-17347. ABFS: Read optimizations - Contributed by Bilahari T H (cherry picked from commit 1448add08fcd4a23e59eab5f75ef46fca6b1c3d1) --- .../hadoop/fs/azurebfs/AbfsConfiguration.java | 28 ++ .../fs/azurebfs/AzureBlobFileSystemStore.java | 2 + .../azurebfs/constants/ConfigurationKeys.java | 2 + .../constants/FileSystemConfigurations.java | 6 +- .../fs/azurebfs/services/AbfsInputStream.java | 196 +++++++++- .../services/AbfsInputStreamContext.java | 24 ++ .../services/ITestAbfsInputStream.java | 256 +++++++++++++ .../ITestAbfsInputStreamReadFooter.java | 358 ++++++++++++++++++ .../ITestAbfsInputStreamSmallFileReads.java | 326 ++++++++++++++++ 9 files changed, 1176 insertions(+), 22 deletions(-) create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamSmallFileReads.java diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 3d09a806fdd..b1c95d2e82b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -100,6 +100,16 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_WRITE_BUFFER_SIZE) private int writeBufferSize; + @BooleanConfigurationValidatorAnnotation( + ConfigurationKey = AZURE_READ_SMALL_FILES_COMPLETELY, + DefaultValue = DEFAULT_READ_SMALL_FILES_COMPLETELY) + private boolean readSmallFilesCompletely; + + @BooleanConfigurationValidatorAnnotation( + ConfigurationKey = AZURE_READ_OPTIMIZE_FOOTER_READ, + DefaultValue = DEFAULT_OPTIMIZE_FOOTER_READ) + private boolean optimizeFooterRead; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_READ_BUFFER_SIZE, MinValue = MIN_BUFFER_SIZE, MaxValue = MAX_BUFFER_SIZE, @@ -527,6 +537,14 @@ public class AbfsConfiguration{ return this.writeBufferSize; } + public boolean readSmallFilesCompletely() { + return this.readSmallFilesCompletely; + } + + public boolean optimizeFooterRead() { + return this.optimizeFooterRead; + } + public int getReadBufferSize() { return this.readBufferSize; } @@ -925,4 +943,14 @@ public class AbfsConfiguration{ return authority; } + @VisibleForTesting + public void setReadSmallFilesCompletely(boolean readSmallFilesCompletely) { + this.readSmallFilesCompletely = readSmallFilesCompletely; + } + + @VisibleForTesting + public void setOptimizeFooterRead(boolean optimizeFooterRead) { + this.optimizeFooterRead = optimizeFooterRead; + } + } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index a766c621536..869a6f9907f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -643,6 +643,8 @@ public class AzureBlobFileSystemStore implements Closeable { .withReadBufferSize(abfsConfiguration.getReadBufferSize()) .withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth()) .withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends()) + .withReadSmallFilesCompletely(abfsConfiguration.readSmallFilesCompletely()) + .withOptimizeFooterRead(abfsConfiguration.optimizeFooterRead()) .withStreamStatistics(new AbfsInputStreamStatisticsImpl()) .withShouldReadBufferSizeAlways( abfsConfiguration.shouldReadBufferSizeAlways()) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index cb9c0de59f8..3e1ff80e7ef 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -56,6 +56,8 @@ public final class ConfigurationKeys { public static final String AZURE_WRITE_MAX_REQUESTS_TO_QUEUE = "fs.azure.write.max.requests.to.queue"; public static final String AZURE_WRITE_BUFFER_SIZE = "fs.azure.write.request.size"; public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size"; + public static final String AZURE_READ_SMALL_FILES_COMPLETELY = "fs.azure.read.smallfilescompletely"; + public static final String AZURE_READ_OPTIMIZE_FOOTER_READ = "fs.azure.read.optimizefooterread"; public static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size"; public static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME = "fs.azure.block.location.impersonatedhost"; public static final String AZURE_CONCURRENT_CONNECTION_VALUE_OUT = "fs.azure.concurrentRequestCount.out"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 27dafd0b1f9..80082063f6e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -50,13 +50,15 @@ public final class FileSystemConfigurations { public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_BACKOFF_INTERVAL = SIXTY_SECONDS; public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF = 2; - private static final int ONE_KB = 1024; - private static final int ONE_MB = ONE_KB * ONE_KB; + public static final int ONE_KB = 1024; + public static final int ONE_MB = ONE_KB * ONE_KB; // Default upload and download buffer size public static final int DEFAULT_WRITE_BUFFER_SIZE = 8 * ONE_MB; // 8 MB public static final int APPENDBLOB_MAX_WRITE_BUFFER_SIZE = 4 * ONE_MB; // 4 MB public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB; // 4 MB + public static final boolean DEFAULT_READ_SMALL_FILES_COMPLETELY = false; + public static final boolean DEFAULT_OPTIMIZE_FOOTER_READ = false; public static final boolean DEFAULT_ALWAYS_READ_BUFFER_SIZE = false; public static final int DEFAULT_READ_AHEAD_BLOCK_SIZE = 4 * ONE_MB; public static final int MIN_BUFFER_SIZE = 16 * ONE_KB; // 16 KB diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 3682bcbc3aa..1d109f493ce 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -38,6 +38,10 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationExcep import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken; +import static java.lang.Math.max; +import static java.lang.Math.min; + +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB; import static org.apache.hadoop.util.StringUtils.toLowerCase; /** @@ -46,6 +50,9 @@ import static org.apache.hadoop.util.StringUtils.toLowerCase; public class AbfsInputStream extends FSInputStream implements CanUnbuffer, StreamCapabilities { private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class); + // Footer size is set to qualify for both ORC and parquet files + public static final int FOOTER_SIZE = 16 * ONE_KB; + public static final int MAX_OPTIMIZED_READ_ATTEMPTS = 2; private int readAheadBlockSize; private final AbfsClient client; @@ -59,6 +66,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, private final boolean readAheadEnabled; // whether enable readAhead; private final boolean alwaysReadBufferSize; + private boolean firstRead = true; // SAS tokens can be re-used until they expire private CachedSASToken cachedSasToken; private byte[] buffer = null; // will be initialized on first use @@ -70,11 +78,21 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, // of valid bytes in buffer) private boolean closed = false; + // Optimisations modify the pointer fields. + // For better resilience the following fields are used to save the + // existing state before optimization flows. + private int limitBkp; + private int bCursorBkp; + private long fCursorBkp; + private long fCursorAfterLastReadBkp; + /** Stream statistics. */ private final AbfsInputStreamStatistics streamStatistics; private long bytesFromReadAhead; // bytes read from readAhead; for testing private long bytesFromRemoteRead; // bytes read remotely; for testing + private final AbfsInputStreamContext context; + public AbfsInputStream( final AbfsClient client, final Statistics statistics, @@ -96,6 +114,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, this.cachedSasToken = new CachedSASToken( abfsInputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds()); this.streamStatistics = abfsInputStreamContext.getStreamStatistics(); + this.context = abfsInputStreamContext; readAheadBlockSize = abfsInputStreamContext.getReadAheadBlockSize(); // Propagate the config values to ReadBufferManager so that the first instance @@ -137,7 +156,13 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, } incrementReadOps(); do { - lastReadBytes = readOneBlock(b, currentOff, currentLen); + if (shouldReadFully()) { + lastReadBytes = readFileCompletely(b, currentOff, currentLen); + } else if (shouldReadLastBlock()) { + lastReadBytes = readLastBlock(b, currentOff, currentLen); + } else { + lastReadBytes = readOneBlock(b, currentOff, currentLen); + } if (lastReadBytes > 0) { currentOff += lastReadBytes; currentLen -= lastReadBytes; @@ -150,27 +175,24 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, return totalReadBytes > 0 ? totalReadBytes : lastReadBytes; } + private boolean shouldReadFully() { + return this.firstRead && this.context.readSmallFilesCompletely() + && this.contentLength <= this.bufferSize; + } + + private boolean shouldReadLastBlock() { + long footerStart = max(0, this.contentLength - FOOTER_SIZE); + return this.firstRead && this.context.optimizeFooterRead() + && this.fCursor >= footerStart; + } + private int readOneBlock(final byte[] b, final int off, final int len) throws IOException { - if (closed) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); - } - - Preconditions.checkNotNull(b); - LOG.debug("read one block requested b.length = {} off {} len {}", b.length, - off, len); - if (len == 0) { return 0; } - - if (this.available() == 0) { + if (!validate(b, off, len)) { return -1; } - - if (off < 0 || len < 0 || len > b.length - off) { - throw new IndexOutOfBoundsException(); - } - //If buffer is empty, then fill the buffer. if (bCursor == limit) { //If EOF, then return -1 @@ -197,6 +219,9 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, bytesRead = readInternal(fCursor, buffer, 0, b.length, true); } } + if (firstRead) { + firstRead = false; + } if (bytesRead == -1) { return -1; @@ -206,11 +231,123 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, fCursor += bytesRead; fCursorAfterLastRead = fCursor; } + return copyToUserBuffer(b, off, len); + } + private int readFileCompletely(final byte[] b, final int off, final int len) + throws IOException { + if (len == 0) { + return 0; + } + if (!validate(b, off, len)) { + return -1; + } + savePointerState(); + // data need to be copied to user buffer from index bCursor, bCursor has + // to be the current fCusor + bCursor = (int) fCursor; + return optimisedRead(b, off, len, 0, contentLength); + } + + private int readLastBlock(final byte[] b, final int off, final int len) + throws IOException { + if (len == 0) { + return 0; + } + if (!validate(b, off, len)) { + return -1; + } + savePointerState(); + // data need to be copied to user buffer from index bCursor, + // AbfsInutStream buffer is going to contain data from last block start. In + // that case bCursor will be set to fCursor - lastBlockStart + long lastBlockStart = max(0, contentLength - bufferSize); + bCursor = (int) (fCursor - lastBlockStart); + // 0 if contentlength is < buffersize + long actualLenToRead = min(bufferSize, contentLength); + return optimisedRead(b, off, len, lastBlockStart, actualLenToRead); + } + + private int optimisedRead(final byte[] b, final int off, final int len, + final long readFrom, final long actualLen) throws IOException { + fCursor = readFrom; + int totalBytesRead = 0; + int lastBytesRead = 0; + try { + buffer = new byte[bufferSize]; + for (int i = 0; + i < MAX_OPTIMIZED_READ_ATTEMPTS && fCursor < contentLength; i++) { + lastBytesRead = readInternal(fCursor, buffer, limit, + (int) actualLen - limit, true); + if (lastBytesRead > 0) { + totalBytesRead += lastBytesRead; + limit += lastBytesRead; + fCursor += lastBytesRead; + fCursorAfterLastRead = fCursor; + } + } + } catch (IOException e) { + LOG.debug("Optimized read failed. Defaulting to readOneBlock {}", e); + restorePointerState(); + return readOneBlock(b, off, len); + } finally { + firstRead = false; + } + if (totalBytesRead < 1) { + restorePointerState(); + return -1; + } + // If the read was partial and the user requested part of data has + // not read then fallback to readoneblock. When limit is smaller than + // bCursor that means the user requested data has not been read. + if (fCursor < contentLength && bCursor > limit) { + restorePointerState(); + return readOneBlock(b, off, len); + } + return copyToUserBuffer(b, off, len); + } + + private void savePointerState() { + // Saving the current state for fall back ifn case optimization fails + this.limitBkp = this.limit; + this.fCursorBkp = this.fCursor; + this.fCursorAfterLastReadBkp = this.fCursorAfterLastRead; + this.bCursorBkp = this.bCursor; + } + + private void restorePointerState() { + // Saving the current state for fall back ifn case optimization fails + this.limit = this.limitBkp; + this.fCursor = this.fCursorBkp; + this.fCursorAfterLastRead = this.fCursorAfterLastReadBkp; + this.bCursor = this.bCursorBkp; + } + + private boolean validate(final byte[] b, final int off, final int len) + throws IOException { + if (closed) { + throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + } + + Preconditions.checkNotNull(b); + LOG.debug("read one block requested b.length = {} off {} len {}", b.length, + off, len); + + if (this.available() == 0) { + return false; + } + + if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } + return true; + } + + private int copyToUserBuffer(byte[] b, int off, int len){ //If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer) //(bytes returned may be less than requested) int bytesRemaining = limit - bCursor; - int bytesToRead = Math.min(len, bytesRemaining); + int bytesToRead = min(len, bytesRemaining); System.arraycopy(buffer, bCursor, b, off, bytesToRead); bCursor += bytesToRead; if (statistics != null) { @@ -224,7 +361,6 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, return bytesToRead; } - private int readInternal(final long position, final byte[] b, final int offset, final int length, final boolean bypassReadAhead) throws IOException { if (readAheadEnabled && !bypassReadAhead) { @@ -239,7 +375,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, long nextOffset = position; // First read to queue needs to be of readBufferSize and later // of readAhead Block size - long nextSize = Math.min((long) bufferSize, contentLength - nextOffset); + long nextSize = min((long) bufferSize, contentLength - nextOffset); LOG.debug("read ahead enabled issuing readheads num = {}", numReadAheads); while (numReadAheads > 0 && nextOffset < contentLength) { LOG.debug("issuing read ahead requestedOffset = {} requested size {}", @@ -248,7 +384,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, nextOffset = nextOffset + nextSize; numReadAheads--; // From next round onwards should be of readahead block size. - nextSize = Math.min((long) readAheadBlockSize, contentLength - nextOffset); + nextSize = min((long) readAheadBlockSize, contentLength - nextOffset); } // try reading from buffers first @@ -572,4 +708,24 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, } return sb.toString(); } + + @VisibleForTesting + int getBCursor() { + return this.bCursor; + } + + @VisibleForTesting + long getFCursor() { + return this.fCursor; + } + + @VisibleForTesting + long getFCursorAfterLastRead() { + return this.fCursorAfterLastRead; + } + + @VisibleForTesting + long getLimit() { + return this.limit; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java index ade05834a23..ab3d3b0e765 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java @@ -40,6 +40,10 @@ public class AbfsInputStreamContext extends AbfsStreamContext { private AbfsInputStreamStatistics streamStatistics; + private boolean readSmallFilesCompletely; + + private boolean optimizeFooterRead; + public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) { super(sasTokenRenewPeriodForStreamsInSeconds); } @@ -69,6 +73,18 @@ public class AbfsInputStreamContext extends AbfsStreamContext { return this; } + public AbfsInputStreamContext withReadSmallFilesCompletely( + final boolean readSmallFilesCompletely) { + this.readSmallFilesCompletely = readSmallFilesCompletely; + return this; + } + + public AbfsInputStreamContext withOptimizeFooterRead( + final boolean optimizeFooterRead) { + this.optimizeFooterRead = optimizeFooterRead; + return this; + } + public AbfsInputStreamContext withShouldReadBufferSizeAlways( final boolean alwaysReadBufferSize) { this.alwaysReadBufferSize = alwaysReadBufferSize; @@ -110,6 +126,14 @@ public class AbfsInputStreamContext extends AbfsStreamContext { return streamStatistics; } + public boolean readSmallFilesCompletely() { + return this.readSmallFilesCompletely; + } + + public boolean optimizeFooterRead() { + return this.optimizeFooterRead; + } + public boolean shouldReadBufferSizeAlways() { return alwaysReadBufferSize; } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java new file mode 100644 index 00000000000..44b0a362dc6 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java @@ -0,0 +1,256 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
+import org.junit.Test;
+
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+public class ITestAbfsInputStream extends AbstractAbfsIntegrationTest {
+
+ protected static final int HUNDRED = 100;
+
+ public ITestAbfsInputStream() throws Exception {
+ }
+
+ @Test
+ public void testWithNoOptimization() throws Exception {
+ for (int i = 2; i <= 7; i++) {
+ int fileSize = i * ONE_MB;
+ final AzureBlobFileSystem fs = getFileSystem(false, false, fileSize);
+ String fileName = methodName.getMethodName() + i;
+ byte[] fileContent = getRandomBytesArray(fileSize);
+ Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+ testWithNoOptimization(fs, testFilePath, HUNDRED, fileContent);
+ }
+ }
+
+ protected void testWithNoOptimization(final FileSystem fs,
+ final Path testFilePath, final int seekPos, final byte[] fileContent)
+ throws IOException {
+ FSDataInputStream iStream = fs.open(testFilePath);
+ try {
+ AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
+ .getWrappedStream();
+
+ iStream = new FSDataInputStream(abfsInputStream);
+ seek(iStream, seekPos);
+ long totalBytesRead = 0;
+ int length = HUNDRED * HUNDRED;
+ do {
+ byte[] buffer = new byte[length];
+ int bytesRead = iStream.read(buffer, 0, length);
+ totalBytesRead += bytesRead;
+ if ((totalBytesRead + seekPos) >= fileContent.length) {
+ length = (fileContent.length - seekPos) % length;
+ }
+ assertEquals(length, bytesRead);
+ assertContentReadCorrectly(fileContent,
+ (int) (seekPos + totalBytesRead - length), length, buffer);
+
+ assertTrue(abfsInputStream.getFCursor() >= seekPos + totalBytesRead);
+ assertTrue(abfsInputStream.getFCursorAfterLastRead() >= seekPos + totalBytesRead);
+ assertTrue(abfsInputStream.getBCursor() >= totalBytesRead % abfsInputStream.getBufferSize());
+ assertTrue(abfsInputStream.getLimit() >= totalBytesRead % abfsInputStream.getBufferSize());
+ } while (totalBytesRead + seekPos < fileContent.length);
+ } finally {
+ iStream.close();
+ }
+ }
+
+ @Test
+ public void testExceptionInOptimization() throws Exception {
+ for (int i = 2; i <= 7; i++) {
+ int fileSize = i * ONE_MB;
+ final AzureBlobFileSystem fs = getFileSystem(true, true, fileSize);
+ String fileName = methodName.getMethodName() + i;
+ byte[] fileContent = getRandomBytesArray(fileSize);
+ Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+ testExceptionInOptimization(fs, testFilePath, fileSize - HUNDRED,
+ fileSize / 4, fileContent);
+ }
+ }
+
+ private void testExceptionInOptimization(final FileSystem fs,
+ final Path testFilePath,
+ final int seekPos, final int length, final byte[] fileContent)
+ throws IOException {
+
+ FSDataInputStream iStream = fs.open(testFilePath);
+ try {
+ AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
+ .getWrappedStream();
+ abfsInputStream = spy(abfsInputStream);
+ doThrow(new IOException())
+ .doCallRealMethod()
+ .when(abfsInputStream)
+ .readRemote(anyLong(), any(), anyInt(), anyInt());
+
+ iStream = new FSDataInputStream(abfsInputStream);
+ verifyBeforeSeek(abfsInputStream);
+ seek(iStream, seekPos);
+ byte[] buffer = new byte[length];
+ int bytesRead = iStream.read(buffer, 0, length);
+ long actualLength = length;
+ if (seekPos + length > fileContent.length) {
+ long delta = seekPos + length - fileContent.length;
+ actualLength = length - delta;
+ }
+ assertEquals(bytesRead, actualLength);
+ assertContentReadCorrectly(fileContent, seekPos, (int) actualLength, buffer);
+ assertEquals(fileContent.length, abfsInputStream.getFCursor());
+ assertEquals(fileContent.length, abfsInputStream.getFCursorAfterLastRead());
+ assertEquals(actualLength, abfsInputStream.getBCursor());
+ assertTrue(abfsInputStream.getLimit() >= actualLength);
+ } finally {
+ iStream.close();
+ }
+ }
+
+ protected AzureBlobFileSystem getFileSystem(boolean readSmallFilesCompletely)
+ throws IOException {
+ final AzureBlobFileSystem fs = getFileSystem();
+ getAbfsStore(fs).getAbfsConfiguration()
+ .setReadSmallFilesCompletely(readSmallFilesCompletely);
+ return fs;
+ }
+
+ private AzureBlobFileSystem getFileSystem(boolean optimizeFooterRead,
+ boolean readSmallFileCompletely, int fileSize) throws IOException {
+ final AzureBlobFileSystem fs = getFileSystem();
+ getAbfsStore(fs).getAbfsConfiguration()
+ .setOptimizeFooterRead(optimizeFooterRead);
+ if (fileSize <= getAbfsStore(fs).getAbfsConfiguration()
+ .getReadBufferSize()) {
+ getAbfsStore(fs).getAbfsConfiguration()
+ .setReadSmallFilesCompletely(readSmallFileCompletely);
+ }
+ return fs;
+ }
+
+ protected byte[] getRandomBytesArray(int length) {
+ final byte[] b = new byte[length];
+ new Random().nextBytes(b);
+ return b;
+ }
+
+ protected Path createFileWithContent(FileSystem fs, String fileName,
+ byte[] fileContent) throws IOException {
+ Path testFilePath = path(fileName);
+ try (FSDataOutputStream oStream = fs.create(testFilePath)) {
+ oStream.write(fileContent);
+ oStream.flush();
+ }
+ return testFilePath;
+ }
+
+ protected AzureBlobFileSystemStore getAbfsStore(FileSystem fs)
+ throws NoSuchFieldException, IllegalAccessException {
+ AzureBlobFileSystem abfs = (AzureBlobFileSystem) fs;
+ Field abfsStoreField = AzureBlobFileSystem.class
+ .getDeclaredField("abfsStore");
+ abfsStoreField.setAccessible(true);
+ return (AzureBlobFileSystemStore) abfsStoreField.get(abfs);
+ }
+
+ protected Map
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
+
+public class ITestAbfsInputStreamReadFooter extends ITestAbfsInputStream {
+
+ private static final int TEN = 10;
+ private static final int TWENTY = 20;
+
+ public ITestAbfsInputStreamReadFooter() throws Exception {
+ }
+
+ @Test
+ public void testOnlyOneServerCallIsMadeWhenTheConfIsTrue() throws Exception {
+ testNumBackendCalls(true);
+ }
+
+ @Test
+ public void testMultipleServerCallsAreMadeWhenTheConfIsFalse()
+ throws Exception {
+ testNumBackendCalls(false);
+ }
+
+ private void testNumBackendCalls(boolean optimizeFooterRead)
+ throws Exception {
+ for (int i = 1; i <= 4; i++) {
+ int fileSize = i * ONE_MB;
+ final AzureBlobFileSystem fs = getFileSystem(optimizeFooterRead,
+ fileSize);
+ String fileName = methodName.getMethodName() + i;
+ byte[] fileContent = getRandomBytesArray(fileSize);
+ Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+ int length = AbfsInputStream.FOOTER_SIZE;
+ try (FSDataInputStream iStream = fs.open(testFilePath)) {
+ byte[] buffer = new byte[length];
+
+ Map
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
+
+public class ITestAbfsInputStreamSmallFileReads extends ITestAbfsInputStream {
+
+ public ITestAbfsInputStreamSmallFileReads() throws Exception {
+ }
+
+ @Test
+ public void testOnlyOneServerCallIsMadeWhenTheConfIsTrue() throws Exception {
+ testNumBackendCalls(true);
+ }
+
+ @Test
+ public void testMultipleServerCallsAreMadeWhenTheConfIsFalse()
+ throws Exception {
+ testNumBackendCalls(false);
+ }
+
+ private void testNumBackendCalls(boolean readSmallFilesCompletely)
+ throws Exception {
+ final AzureBlobFileSystem fs = getFileSystem(readSmallFilesCompletely);
+ for (int i = 1; i <= 4; i++) {
+ String fileName = methodName.getMethodName() + i;
+ int fileSize = i * ONE_MB;
+ byte[] fileContent = getRandomBytesArray(fileSize);
+ Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+ int length = ONE_KB;
+ try (FSDataInputStream iStream = fs.open(testFilePath)) {
+ byte[] buffer = new byte[length];
+
+ Map