diff --git a/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
index b750b8b91c7..fa6085faa55 100644
--- a/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
+++ b/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
@@ -83,4 +83,17 @@
+
+
+
+
+
+
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
index c613468536b..368283a1704 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
@@ -40,6 +40,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
@@ -235,6 +236,16 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
*/
public static final String KEY_ENABLE_FLAT_LISTING = "fs.azure.flatlist.enable";
+ /**
+ * Optional config to enable a lock free pread which will bypass buffer in
+ * BlockBlobInputStream.
+ * This is not a config which can be set at cluster level. It can be used as
+ * an option on FutureDataInputStreamBuilder.
+ * @see FileSystem#openFile(org.apache.hadoop.fs.Path)
+ */
+ public static final String FS_AZURE_BLOCK_BLOB_BUFFERED_PREAD_DISABLE =
+ "fs.azure.block.blob.buffered.pread.disable";
+
/**
* The set of directories where we should apply atomic folder rename
* synchronized with createNonRecursive.
@@ -1577,8 +1588,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
* Opens a new input stream for the given blob (page or block blob)
* to read its data.
*/
- private InputStream openInputStream(CloudBlobWrapper blob)
- throws StorageException, IOException {
+ private InputStream openInputStream(CloudBlobWrapper blob,
+ Optional options) throws StorageException, IOException {
if (blob instanceof CloudBlockBlobWrapper) {
LOG.debug("Using stream seek algorithm {}", inputStreamVersion);
switch(inputStreamVersion) {
@@ -1586,9 +1597,13 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
return blob.openInputStream(getDownloadOptions(),
getInstrumentedContext(isConcurrentOOBAppendAllowed()));
case 2:
+ boolean bufferedPreadDisabled = options.map(c -> c
+ .getBoolean(FS_AZURE_BLOCK_BLOB_BUFFERED_PREAD_DISABLE, false))
+ .orElse(false);
return new BlockBlobInputStream((CloudBlockBlobWrapper) blob,
getDownloadOptions(),
- getInstrumentedContext(isConcurrentOOBAppendAllowed()));
+ getInstrumentedContext(isConcurrentOOBAppendAllowed()),
+ bufferedPreadDisabled);
default:
throw new IOException("Unknown seek algorithm: " + inputStreamVersion);
}
@@ -2262,6 +2277,12 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
@Override
public InputStream retrieve(String key, long startByteOffset)
throws AzureException, IOException {
+ return retrieve(key, startByteOffset, Optional.empty());
+ }
+
+ @Override
+ public InputStream retrieve(String key, long startByteOffset,
+ Optional options) throws AzureException, IOException {
try {
// Check if a session exists, if not create a session with the
// Azure storage server.
@@ -2273,7 +2294,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
}
checkContainer(ContainerAccessType.PureRead);
- InputStream inputStream = openInputStream(getBlobReference(key));
+ InputStream inputStream = openInputStream(getBlobReference(key), options);
if (startByteOffset > 0) {
// Skip bytes and ignore return value. This is okay
// because if you try to skip too far you will be positioned
@@ -2824,7 +2845,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
OutputStream opStream = null;
try {
if (srcBlob.getProperties().getBlobType() == BlobType.PAGE_BLOB){
- ipStream = openInputStream(srcBlob);
+ ipStream = openInputStream(srcBlob, Optional.empty());
opStream = openOutputStream(dstBlob);
byte[] buffer = new byte[PageBlobFormatHelpers.PAGE_SIZE];
int len;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java
index c37b2bec6ec..00e84add34c 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java
@@ -28,6 +28,7 @@ import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper;
@@ -36,10 +37,11 @@ import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper;
* random access and seek. Random access performance is improved by several
* orders of magnitude.
*/
-final class BlockBlobInputStream extends InputStream implements Seekable {
+final class BlockBlobInputStream extends FSInputStream {
private final CloudBlockBlobWrapper blob;
private final BlobRequestOptions options;
private final OperationContext opContext;
+ private final boolean bufferedPreadDisabled;
private InputStream blobInputStream = null;
private int minimumReadSizeInBytes = 0;
private long streamPositionAfterLastRead = -1;
@@ -64,10 +66,12 @@ final class BlockBlobInputStream extends InputStream implements Seekable {
*/
BlockBlobInputStream(CloudBlockBlobWrapper blob,
BlobRequestOptions options,
- OperationContext opContext) throws IOException {
+ OperationContext opContext, boolean bufferedPreadDisabled)
+ throws IOException {
this.blob = blob;
this.options = options;
this.opContext = opContext;
+ this.bufferedPreadDisabled = bufferedPreadDisabled;
this.minimumReadSizeInBytes = blob.getStreamMinimumReadSizeInBytes();
@@ -263,6 +267,39 @@ final class BlockBlobInputStream extends InputStream implements Seekable {
}
}
+ @Override
+ public int read(long position, byte[] buffer, int offset, int length)
+ throws IOException {
+ synchronized (this) {
+ checkState();
+ }
+ if (!bufferedPreadDisabled) {
+ // This will do a seek + read in which the streamBuffer will get used.
+ return super.read(position, buffer, offset, length);
+ }
+ validatePositionedReadArgs(position, buffer, offset, length);
+ if (length == 0) {
+ return 0;
+ }
+ if (position >= streamLength) {
+ throw new EOFException("position is beyond stream capacity");
+ }
+ MemoryOutputStream os = new MemoryOutputStream(buffer, offset, length);
+ long bytesToRead = Math.min(minimumReadSizeInBytes,
+ Math.min(os.capacity(), streamLength - position));
+ try {
+ blob.downloadRange(position, bytesToRead, os, options, opContext);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ int bytesRead = os.size();
+ if (bytesRead == 0) {
+ // This may happen if the blob was modified after the length was obtained.
+ throw new EOFException("End of stream reached unexpectedly.");
+ }
+ return bytesRead;
+ }
+
/**
* Reads up to len
bytes of data from the input stream into an
* array of bytes.
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
index 48ef495d7b7..e9f0e784fc1 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
@@ -33,11 +33,14 @@ import java.util.Date;
import java.util.EnumSet;
import java.util.TimeZone;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.Stack;
import java.util.HashMap;
@@ -61,6 +64,7 @@ import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
@@ -70,6 +74,8 @@ import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
import org.apache.hadoop.fs.azure.security.Constants;
import org.apache.hadoop.fs.azure.security.RemoteWasbDelegationTokenManager;
import org.apache.hadoop.fs.azure.security.WasbDelegationTokenManager;
+import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
@@ -79,6 +85,7 @@ import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
+import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time;
@@ -915,6 +922,43 @@ public class NativeAzureFileSystem extends FileSystem {
}
}
+ @Override
+ public int read(long position, byte[] buffer, int offset, int length)
+ throws IOException {
+ // SpotBugs reports bug type IS2_INCONSISTENT_SYNC here.
+ // This report is not valid here.
+ // 'this.in' is instance of BlockBlobInputStream and read(long, byte[], int, int)
+ // calls it's Super class method when 'fs.azure.block.blob.buffered.pread.disable'
+ // is configured false. Super class FSInputStream's implementation is having
+ // proper synchronization.
+ // When 'fs.azure.block.blob.buffered.pread.disable' is true, we want a lock free
+ // implementation of blob read. Here we don't use any of the InputStream's
+ // shared resource (buffer) and also don't change any cursor position etc.
+ // So its safe to go with unsynchronized way of read.
+ if (in instanceof PositionedReadable) {
+ try {
+ int result = ((PositionedReadable) this.in).read(position, buffer,
+ offset, length);
+ if (null != statistics && result > 0) {
+ statistics.incrementBytesRead(result);
+ }
+ return result;
+ } catch (IOException e) {
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
+ if (innerException instanceof StorageException) {
+ LOG.error("Encountered Storage Exception for read on Blob : {}"
+ + " Exception details: {} Error Code : {}",
+ key, e, ((StorageException) innerException).getErrorCode());
+ if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
+ throw new FileNotFoundException(String.format("%s is not found", key));
+ }
+ }
+ throw e;
+ }
+ }
+ return super.read(position, buffer, offset, length);
+ }
+
@Override
public synchronized void close() throws IOException {
if (!closed) {
@@ -3043,6 +3087,12 @@ public class NativeAzureFileSystem extends FileSystem {
@Override
public FSDataInputStream open(Path f, int bufferSize) throws FileNotFoundException, IOException {
+ return open(f, bufferSize, Optional.empty());
+ }
+
+ private FSDataInputStream open(Path f, int bufferSize,
+ Optional options)
+ throws FileNotFoundException, IOException {
LOG.debug("Opening file: {}", f.toString());
@@ -3077,7 +3127,7 @@ public class NativeAzureFileSystem extends FileSystem {
InputStream inputStream;
try {
- inputStream = store.retrieve(key);
+ inputStream = store.retrieve(key, 0, options);
} catch(Exception ex) {
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
@@ -3094,6 +3144,18 @@ public class NativeAzureFileSystem extends FileSystem {
new NativeAzureFsInputStream(inputStream, key, meta.getLen()), bufferSize));
}
+ @Override
+ protected CompletableFuture openFileWithOptions(Path path,
+ OpenFileParameters parameters) throws IOException {
+ AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
+ parameters.getMandatoryKeys(),
+ Collections.emptySet(),
+ "for " + path);
+ return LambdaUtils.eval(
+ new CompletableFuture<>(), () ->
+ open(path, parameters.getBufferSize(), Optional.of(parameters.getOptions())));
+ }
+
@Override
public boolean rename(Path src, Path dst) throws FileNotFoundException, IOException {
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
index 0944b1b0987..91aad992a1f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.Date;
+import java.util.Optional;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@@ -50,6 +51,9 @@ interface NativeFileSystemStore {
InputStream retrieve(String key, long byteRangeStart) throws IOException;
+ InputStream retrieve(String key, long byteRangeStart,
+ Optional options) throws IOException;
+
DataOutputStream storefile(String keyEncoded,
PermissionStatus permissionStatus,
String key) throws AzureException;
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/index.md b/hadoop-tools/hadoop-azure/src/site/markdown/index.md
index 11d0a18b558..2af6b498a27 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/index.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/index.md
@@ -545,6 +545,17 @@ The maximum number of entries that that cache can hold can be customized using t
```
+### Performance optimization configurations
+
+`fs.azure.block.blob.buffered.pread.disable`: By default the positional read API will do a
+seek and read on input stream. This read will fill the buffer cache in
+BlockBlobInputStream. If this configuration is true it will skip usage of buffer and do a
+lock free call for reading from blob. This optimization is very much helpful for HBase kind
+of short random read over a shared InputStream instance.
+Note: This is not a config which can be set at cluster level. It can be used as
+an option on FutureDataInputStreamBuilder.
+See FileSystem#openFile(Path path)
+
## Further Reading
* [Testing the Azure WASB client](testing_azure.html).
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java
index 07a13df11f3..cea11c0380e 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azure.integration.AbstractAzureScaleTest;
import org.apache.hadoop.fs.azure.integration.AzureTestUtils;
@@ -306,6 +307,61 @@ public class ITestBlockBlobInputStream extends AbstractAzureScaleTest {
assertArrayEquals("Mismatch in read data", bufferV1, bufferV2);
}
+ @Test
+ public void test_202_PosReadTest() throws Exception {
+ assumeHugeFileExists();
+ FutureDataInputStreamBuilder builder = accountUsingInputStreamV2
+ .getFileSystem().openFile(TEST_FILE_PATH);
+ builder.opt(AzureNativeFileSystemStore.FS_AZURE_BLOCK_BLOB_BUFFERED_PREAD_DISABLE, true);
+ try (
+ FSDataInputStream inputStreamV1
+ = accountUsingInputStreamV1.getFileSystem().open(TEST_FILE_PATH);
+ FSDataInputStream inputStreamV2
+ = accountUsingInputStreamV2.getFileSystem().open(TEST_FILE_PATH);
+ FSDataInputStream inputStreamV2NoBuffer = builder.build().get();
+ ) {
+ final int bufferSize = 4 * KILOBYTE;
+ byte[] bufferV1 = new byte[bufferSize];
+ byte[] bufferV2 = new byte[bufferSize];
+ byte[] bufferV2NoBuffer = new byte[bufferSize];
+
+ verifyConsistentReads(inputStreamV1, inputStreamV2, inputStreamV2NoBuffer, 0,
+ bufferV1, bufferV2, bufferV2NoBuffer);
+
+ int pos = 2 * KILOBYTE;
+ verifyConsistentReads(inputStreamV1, inputStreamV2, inputStreamV2NoBuffer, pos,
+ bufferV1, bufferV2, bufferV2NoBuffer);
+
+ pos = 10 * KILOBYTE;
+ verifyConsistentReads(inputStreamV1, inputStreamV2, inputStreamV2NoBuffer, pos,
+ bufferV1, bufferV2, bufferV2NoBuffer);
+
+ pos = 4100 * KILOBYTE;
+ verifyConsistentReads(inputStreamV1, inputStreamV2, inputStreamV2NoBuffer, pos,
+ bufferV1, bufferV2, bufferV2NoBuffer);
+ }
+ }
+
+ private void verifyConsistentReads(FSDataInputStream inputStreamV1,
+ FSDataInputStream inputStreamV2, FSDataInputStream inputStreamV2NoBuffer,
+ int pos, byte[] bufferV1, byte[] bufferV2, byte[] bufferV2NoBuffer)
+ throws IOException {
+ int size = bufferV1.length;
+ int numBytesReadV1 = inputStreamV1.read(pos, bufferV1, 0, size);
+ assertEquals("Bytes read from V1 stream", size, numBytesReadV1);
+
+ int numBytesReadV2 = inputStreamV2.read(pos, bufferV2, 0, size);
+ assertEquals("Bytes read from V2 stream", size, numBytesReadV2);
+
+ int numBytesReadV2NoBuffer = inputStreamV2NoBuffer.read(pos,
+ bufferV2NoBuffer, 0, size);
+ assertEquals("Bytes read from V2 stream (buffered pread disabled)", size,
+ numBytesReadV2NoBuffer);
+
+ assertArrayEquals("Mismatch in read data", bufferV1, bufferV2);
+ assertArrayEquals("Mismatch in read data", bufferV2, bufferV2NoBuffer);
+ }
+
/**
* Validates the implementation of InputStream.markSupported.
* @throws IOException